From 5a16aaf7ad05cefde3f5564802c1e55bdb2d18d3 Mon Sep 17 00:00:00 2001 From: zavoraad Date: Thu, 4 Jan 2024 16:32:13 +0000 Subject: [PATCH] init --- (Clone) code/ai_gateway.py | 65 +++ (Clone) code/ang_data_preparation.sql | 420 ++++++++++++++++++ (Clone) code/ang_jsl.sql | 145 ++++++ (Clone) code/mlflow_logging_ang.py | 129 ++++++ (Clone) code/mlflow_logging_nara.py | 164 +++++++ (Clone) code/model_serving.py | 74 +++ (Clone) code/model_serving_test.py | 74 +++ ...] Contributing to Solution Accelerators.py | 127 ------ 00_introduction.py | 2 + 01_data_preparation.py | 44 ++ 02_create_monitor.py | 43 ++ 03_train_llm.py | 230 ++++++++++ 04_model_prediction_premlflow.py | 119 +++++ 05_mlflow_logging.py | 135 ++++++ 06_mlflow_batch_prediciton.py | 45 ++ _Analysis.py | 3 - _Introduction_And_Setup.py | 3 - util/data-extract.py | 52 --- util/generate-iot-data.py | 106 ----- util/notebook-config.py | 33 -- 20 files changed, 1689 insertions(+), 324 deletions(-) create mode 100644 (Clone) code/ai_gateway.py create mode 100644 (Clone) code/ang_data_preparation.sql create mode 100644 (Clone) code/ang_jsl.sql create mode 100644 (Clone) code/mlflow_logging_ang.py create mode 100644 (Clone) code/mlflow_logging_nara.py create mode 100644 (Clone) code/model_serving.py create mode 100644 (Clone) code/model_serving_test.py delete mode 100644 00_[PLEASE READ] Contributing to Solution Accelerators.py create mode 100644 00_introduction.py create mode 100644 01_data_preparation.py create mode 100644 02_create_monitor.py create mode 100644 03_train_llm.py create mode 100644 04_model_prediction_premlflow.py create mode 100644 05_mlflow_logging.py create mode 100644 06_mlflow_batch_prediciton.py delete mode 100644 _Analysis.py delete mode 100644 _Introduction_And_Setup.py delete mode 100644 util/data-extract.py delete mode 100644 util/generate-iot-data.py delete mode 100644 util/notebook-config.py diff --git a/(Clone) code/ai_gateway.py b/(Clone) code/ai_gateway.py new file mode 100644 index 0000000..e60875e --- /dev/null +++ b/(Clone) code/ai_gateway.py @@ -0,0 +1,65 @@ +# Databricks notebook source +client.create_endpoint( + name="bedrock-anthropic-completions-endpoint", + config={ + "served_entities": [ + "external_model": { + "name": "claude-v2", + "provider": "aws-bedrock", + "task": "llm/v1/completions", + "aws_bedrock_config": { + "aws_region": GLD_AWS_REGION, + "aws_access_key_id": GLD_AWS_ACCESS_KEY_ID, + "aws_secret_access_key": GLD_AWS_SECRET_ACCESS_KEY, + "bedrock_provider": "anthropic" + } + } + ] + } +) + +# COMMAND ---------- + +pip install mlflow --upgrade + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +GLD_AWS_REGION = 'x' +GLD_AWS_ACCESS_KEY_ID = 'y' +GLD_AWS_SECRET_ACCESS_KEY = 'z' + +# COMMAND ---------- + +import mlflow.deployments + +client = mlflow.deployments.get_deploy_client("databricks") + +client.create_endpoint( + name="bedrock-anthropic-completions-endpoint", + config={ + "served_entities": [ + { + "external_model": { + "name": "claude-v2", + "provider": "aws-bedrock", + "task": "llm/v1/completions", + "aws_bedrock_config": { + "aws_region": GLD_AWS_REGION, + "aws_access_key_id": GLD_AWS_ACCESS_KEY_ID, + "aws_secret_access_key": GLD_AWS_SECRET_ACCESS_KEY, + "bedrock_provider": "anthropic" + } + } + } + ] + } +) + +# COMMAND ---------- + +token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() +dbutils.fs.put("file:///root/.databrickscfg", "[DEFAULT]\nhost=https://e2-demo-field-eng.cloud.databricks.com\ntoken=" + token, overwrite=True) diff --git a/(Clone) code/ang_data_preparation.sql b/(Clone) code/ang_data_preparation.sql new file mode 100644 index 0000000..011b8ff --- /dev/null +++ b/(Clone) code/ang_data_preparation.sql @@ -0,0 +1,420 @@ +-- Databricks notebook source +-- MAGIC %md +-- MAGIC ## Preprocessing + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC csv_path = "/Volumes/uc_demos_angelina_leigh/nara_ang_demo/nara_ang/clinical_notes_dataset.csv" + +-- COMMAND ---------- + +use catalog uc_demos_angelina_leigh; +use schema nara_ang_demo + +-- COMMAND ---------- + +CREATE OR REPLACE TEMPORARY VIEW clinical_notes +USING csv +OPTIONS ( + 'path' '/Volumes/uc_demos_angelina_leigh/nara_ang_demo/nara_ang/clinical_notes_dataset.csv', + 'header' 'true', + 'inferSchema' 'true' +); + +-- COMMAND ---------- + +SELECT + protocol_labels, + COUNT(*) AS protocol_counts +FROM + clinical_notes +WHERE + protocol_labels IS NOT NULL +GROUP BY + protocol_labels +HAVING + COUNT(*) >= 3 +--ORDER BY + --COUNT(*) DESC + +-- COMMAND ---------- + +describe extended clinical_notes + +-- COMMAND ---------- + +drop table if exists clinical_notes_table; +CREATE TABLE clinical_notes_table +as +select * +from clinical_notes + +-- COMMAND ---------- + +select * from clinical_notes_table limit 10 + +-- COMMAND ---------- + +select +protocol_labels, +count(*) +from clinical_notes_table +group by protocol_labels + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ## Setup + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC !pip install -U pip +-- MAGIC !pip install accelerate==0.18.0 +-- MAGIC !pip install appdirs==1.4.4 +-- MAGIC !pip install bitsandbytes==0.37.2 +-- MAGIC !pip install datasets==2.10.1 +-- MAGIC !pip install fire==0.5.0 +-- MAGIC !pip install git+https://github.com/huggingface/peft.git +-- MAGIC !pip install git+https://github.com/huggingface/transformers.git +-- MAGIC !pip install torch==2.0.0 +-- MAGIC !pip install sentencepiece==0.1.97 +-- MAGIC !pip install tensorboardX==2.6 +-- MAGIC !pip install gradio==3.23.0 + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC !pip install --upgrade transformers + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC pip install --upgrade accelerate + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC dbutils.library.restartPython() + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC import transformers +-- MAGIC import textwrap +-- MAGIC from transformers import LlamaTokenizer, LlamaForCausalLM +-- MAGIC import os +-- MAGIC import sys +-- MAGIC from typing import List +-- MAGIC +-- MAGIC from peft import ( +-- MAGIC LoraConfig, +-- MAGIC get_peft_model, +-- MAGIC get_peft_model_state_dict, +-- MAGIC prepare_model_for_int8_training, +-- MAGIC ) +-- MAGIC +-- MAGIC import fire +-- MAGIC import torch +-- MAGIC from datasets import load_dataset +-- MAGIC import pandas as pd +-- MAGIC +-- MAGIC import matplotlib.pyplot as plt +-- MAGIC import matplotlib as mpl +-- MAGIC import seaborn as sns +-- MAGIC from pylab import rcParams +-- MAGIC import json +-- MAGIC +-- MAGIC %matplotlib inline +-- MAGIC sns.set(rc={'figure.figsize':(8, 6)}) +-- MAGIC sns.set(rc={'figure.dpi':100}) +-- MAGIC sns.set(style='white', palette='muted', font_scale=1.2) + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC !gdown 1xQ89cpZCnafsW5T3G3ZQWvR7q682t2BN + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ## Alpaca LoRa + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC BASE_MODEL = "decapoda-research/llama-7b-hf" +-- MAGIC +-- MAGIC model = LlamaForCausalLM.from_pretrained( +-- MAGIC BASE_MODEL, +-- MAGIC load_in_8bit=True, +-- MAGIC torch_dtype=torch.float16, +-- MAGIC device_map="auto", +-- MAGIC ) +-- MAGIC +-- MAGIC tokenizer = LlamaTokenizer.from_pretrained(BASE_MODEL) +-- MAGIC +-- MAGIC tokenizer.pad_token_id = ( +-- MAGIC 0 # unk. we want this to be different from the eos token +-- MAGIC ) +-- MAGIC tokenizer.padding_side = "left" + +-- COMMAND ---------- + +select * from uc_demos_angelina_leigh.nara_ang_demo.clinical_notes_table + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC df = spark.sql("SELECT * FROM uc_demos_angelina_leigh.nara_ang_demo.clinical_notes_table") + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC df = df.toPandas() + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC df.head() + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC dataset_data = [ +-- MAGIC { +-- MAGIC "instruction": "predict protocol labels for clinical impressions", +-- MAGIC "input": row_dict["reasonofstudy"], +-- MAGIC "output": row_dict["protocol_labels"] +-- MAGIC } +-- MAGIC for row_dict in df.to_dict(orient="records") +-- MAGIC ] + +-- COMMAND ---------- + +CREATE OR REPLACE TEMPORARY VIEW clinical_notes_json +USING json +OPTIONS ( + 'path' '/Volumes/uc_demos_angelina_leigh/nara_ang_demo/nara_ang/clinical_notes_dataset.json', + 'header' 'true', + 'inferSchema' 'true' +); + +-- COMMAND ---------- + +drop table if exists clinical_notes_table_train; +CREATE TABLE clinical_notes_table_train +as +select * +from clinical_notes_json + +-- COMMAND ---------- + +select * from clinical_notes_table_train + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC df_train = spark.sql("SELECT * FROM uc_demos_angelina_leigh.nara_ang_demo.clinical_notes_table_train") +-- MAGIC df_train = df_train.toPandas() + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC CUTOFF_LEN = 256 + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC def generate_prompt(data_point): +-- MAGIC return f"""Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request. # noqa: E501 +-- MAGIC ### Instruction: +-- MAGIC {data_point["instruction"]} +-- MAGIC ### Input: +-- MAGIC {data_point["input"]} +-- MAGIC ### Response: +-- MAGIC {data_point["output"]}""" + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC def tokenize(prompt, add_eos_token=True): +-- MAGIC # there's probably a way to do this with the tokenizer settings +-- MAGIC # but again, gotta move fast +-- MAGIC result = tokenizer( +-- MAGIC prompt, +-- MAGIC truncation=True, +-- MAGIC max_length=CUTOFF_LEN, +-- MAGIC padding=False, +-- MAGIC return_tensors=None, +-- MAGIC ) +-- MAGIC if ( +-- MAGIC result["input_ids"][-1] != tokenizer.eos_token_id +-- MAGIC and len(result["input_ids"]) < CUTOFF_LEN +-- MAGIC and add_eos_token +-- MAGIC ): +-- MAGIC result["input_ids"].append(tokenizer.eos_token_id) +-- MAGIC result["attention_mask"].append(1) +-- MAGIC +-- MAGIC result["labels"] = result["input_ids"].copy() +-- MAGIC +-- MAGIC return result +-- MAGIC +-- MAGIC def generate_and_tokenize_prompt(data_point): +-- MAGIC full_prompt = generate_prompt(data_point) +-- MAGIC tokenized_full_prompt = tokenize(full_prompt) +-- MAGIC return tokenized_full_prompt + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC df_train["train"] + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC train_val = df_train["train"].train_test_split( +-- MAGIC test_size=200, shuffle=True, seed=42 +-- MAGIC ) +-- MAGIC train_data = ( +-- MAGIC train_val["train"].shuffle().map(generate_and_tokenize_prompt) +-- MAGIC ) +-- MAGIC val_data = ( +-- MAGIC train_val["test"].shuffle().map(generate_and_tokenize_prompt) +-- MAGIC ) + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC LORA_R = 8 +-- MAGIC LORA_ALPHA = 16 +-- MAGIC LORA_DROPOUT= 0.05 +-- MAGIC LORA_TARGET_MODULES = [ +-- MAGIC "q_proj", +-- MAGIC "v_proj", +-- MAGIC ] +-- MAGIC +-- MAGIC BATCH_SIZE = 128 +-- MAGIC MICRO_BATCH_SIZE = 4 +-- MAGIC GRADIENT_ACCUMULATION_STEPS = BATCH_SIZE // MICRO_BATCH_SIZE +-- MAGIC LEARNING_RATE = 3e-4 +-- MAGIC TRAIN_STEPS = 300 +-- MAGIC OUTPUT_DIR = "experiments" + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC model = prepare_model_for_int8_training(model) +-- MAGIC config = LoraConfig( +-- MAGIC r=LORA_R, +-- MAGIC lora_alpha=LORA_ALPHA, +-- MAGIC target_modules=LORA_TARGET_MODULES, +-- MAGIC lora_dropout=LORA_DROPOUT, +-- MAGIC bias="none", +-- MAGIC task_type="CAUSAL_LM", +-- MAGIC ) +-- MAGIC model = get_peft_model(model, config) +-- MAGIC model.print_trainable_parameters() + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ## Training + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC training_arguments = transformers.TrainingArguments( +-- MAGIC per_device_train_batch_size=MICRO_BATCH_SIZE, +-- MAGIC gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS, +-- MAGIC warmup_steps=100, +-- MAGIC max_steps=TRAIN_STEPS, +-- MAGIC learning_rate=LEARNING_RATE, +-- MAGIC fp16=True, +-- MAGIC logging_steps=10, +-- MAGIC optim="adamw_torch", +-- MAGIC evaluation_strategy="steps", +-- MAGIC save_strategy="steps", +-- MAGIC eval_steps=50, +-- MAGIC save_steps=50, +-- MAGIC output_dir=OUTPUT_DIR, +-- MAGIC save_total_limit=3, +-- MAGIC load_best_model_at_end=True, +-- MAGIC report_to="tensorboard" +-- MAGIC ) + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC data_collator = transformers.DataCollatorForSeq2Seq( +-- MAGIC tokenizer, pad_to_multiple_of=8, return_tensors="pt", padding=True +-- MAGIC ) + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC trainer = transformers.Trainer( +-- MAGIC model=model, +-- MAGIC train_dataset=train_data, +-- MAGIC eval_dataset=val_data, +-- MAGIC args=training_arguments, +-- MAGIC data_collator=data_collator +-- MAGIC ) +-- MAGIC model.config.use_cache = False +-- MAGIC old_state_dict = model.state_dict +-- MAGIC model.state_dict = ( +-- MAGIC lambda self, *_, **__: get_peft_model_state_dict( +-- MAGIC self, old_state_dict() +-- MAGIC ) +-- MAGIC ).__get__(model, type(model)) +-- MAGIC +-- MAGIC model = torch.compile(model) +-- MAGIC +-- MAGIC trainer.train() +-- MAGIC model.save_pretrained(OUTPUT_DIR) + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC %load_ext tensorboard +-- MAGIC %tensorboard --logdir experiments/runs + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC from huggingface_hub import notebook_login +-- MAGIC +-- MAGIC notebook_login() + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC model.push_to_hub("curiousily/alpaca-bitcoin-tweets-sentiment", use_auth_token=True) + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ## Inference + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC !git clone https://github.com/tloen/alpaca-lora.git +-- MAGIC %cd alpaca-lora +-- MAGIC !git checkout a48d947 + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC !python generate.py \ +-- MAGIC --load_8bit \ +-- MAGIC --base_model 'decapoda-research/llama-7b-hf' \ +-- MAGIC --lora_weights 'curiousily/alpaca-bitcoin-tweets-sentiment' \ +-- MAGIC --share_gradio + +-- COMMAND ---------- + +-- MAGIC %md +-- MAGIC ## References +-- MAGIC - [Bitcoin Sentiment Dataset](https://www.kaggle.com/datasets/aisolutions353/btc-tweets-sentiment) diff --git a/(Clone) code/ang_jsl.sql b/(Clone) code/ang_jsl.sql new file mode 100644 index 0000000..6f9d153 --- /dev/null +++ b/(Clone) code/ang_jsl.sql @@ -0,0 +1,145 @@ +-- Databricks notebook source +select +count (*) +from uc_demos_angelina_leigh.nara_ang_demo.delta_med_icd10_filtered + +-- COMMAND ---------- + +select * from system.billing.usage where usage_metadata.cluster_id = '0601-182128-dcbte59m' and usage_date = '2023-11-03' + +-- COMMAND ---------- + +select * from john_snow_labs_icd_9_icd_10_and_clinical_classification_codes.icd_9_icd_10_and_clinical_classification_codes.clinical_classification_software_for_icd_10_cm + +-- COMMAND ---------- + +use catalog uc_demos_angelina_leigh; +use schema nara_ang_demo + +-- COMMAND ---------- + +--create table icd10_jsl as +select * +from + john_snow_labs_icd_9_icd_10_and_clinical_classification_codes.icd_9_icd_10_and_clinical_classification_codes.clinical_classification_software_for_icd_10_cm +group by + all +having count(ICD10CM_Code) >= 5 + +-- COMMAND ---------- + +select * from icd10_jsl + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC (trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100) +-- MAGIC print("Training Dataset Count: " + str(trainingData.count())) +-- MAGIC print("Test Dataset Count: " + str(testData.count())) + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC from pyspark.sql.functions import col +-- MAGIC +-- MAGIC trainingData.groupBy("protocol_labels") \ +-- MAGIC .count() \ +-- MAGIC .orderBy(col("count").desc()) \ +-- MAGIC .show() + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC !pip install spark-nlp==5.1.3 + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC import mlflow +-- MAGIC import mlflow.spark +-- MAGIC from pyspark.ml import Pipeline +-- MAGIC from sparknlp.base import * +-- MAGIC from sparknlp.annotator import * +-- MAGIC from pyspark.ml import Pipeline +-- MAGIC import pandas as pd +-- MAGIC import os +-- MAGIC +-- MAGIC with mlflow.start_run(run_name="JSL-Text-Classification") as run: +-- MAGIC # Define pipeline +-- MAGIC document_assembler = DocumentAssembler() \ +-- MAGIC .setInputCol("studydesc+procedurecode+reasonofstudy") \ +-- MAGIC .setOutputCol("document") +-- MAGIC +-- MAGIC tokenizer = Tokenizer() \ +-- MAGIC .setInputCols(["document"]) \ +-- MAGIC .setOutputCol("token") +-- MAGIC +-- MAGIC normalizer = Normalizer() \ +-- MAGIC .setInputCols(["token"]) \ +-- MAGIC .setOutputCol("normalized") +-- MAGIC +-- MAGIC stopwords_cleaner = StopWordsCleaner()\ +-- MAGIC .setInputCols("normalized")\ +-- MAGIC .setOutputCol("cleanTokens")\ +-- MAGIC .setCaseSensitive(False) +-- MAGIC +-- MAGIC lemma = LemmatizerModel.pretrained('lemma_antbnc') \ +-- MAGIC .setInputCols(["cleanTokens"]) \ +-- MAGIC .setOutputCol("lemma") +-- MAGIC +-- MAGIC glove_embeddings = WordEmbeddingsModel().pretrained() \ +-- MAGIC .setInputCols(["document",'lemma'])\ +-- MAGIC .setOutputCol("embeddings")\ +-- MAGIC .setCaseSensitive(False) +-- MAGIC +-- MAGIC embeddingsSentence = SentenceEmbeddings() \ +-- MAGIC .setInputCols(["document", "embeddings"]) \ +-- MAGIC .setOutputCol("sentence_embeddings") \ +-- MAGIC .setPoolingStrategy("AVERAGE") +-- MAGIC +-- MAGIC classsifierdl = ClassifierDLApproach()\ +-- MAGIC .setInputCols(["sentence_embeddings"])\ +-- MAGIC .setOutputCol("class")\ +-- MAGIC .setLabelColumn("protocol_labels")\ +-- MAGIC .setMaxEpochs(3)\ +-- MAGIC .setEnableOutputLogs(True) +-- MAGIC #.setOutputLogsPath('logs') +-- MAGIC +-- MAGIC clf_pipeline = Pipeline( +-- MAGIC stages=[ +-- MAGIC document_assembler, +-- MAGIC tokenizer, +-- MAGIC normalizer, +-- MAGIC stopwords_cleaner, +-- MAGIC lemma, +-- MAGIC glove_embeddings, +-- MAGIC embeddingsSentence, +-- MAGIC classsifierdl +-- MAGIC ] +-- MAGIC ) +-- MAGIC +-- MAGIC clf_pipelineModel = clf_pipeline.fit(trainingData) +-- MAGIC +-- MAGIC # Log parameters +-- MAGIC mlflow.log_param("label", "protocol_labels") +-- MAGIC mlflow.log_param("features", "studydesc+procedurecode+reasonofstudy") +-- MAGIC +-- MAGIC # Log model +-- MAGIC mlflow.spark.log_model(clf_pipelineModel, "model") +-- MAGIC +-- MAGIC # Evaluate predictions +-- MAGIC preds = clf_pipelineModel.transform(testData) +-- MAGIC preds.select('protocol_labels','studydesc+procedurecode+reasonofstudy',"class.result").show(10, truncate=80) + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC preds_df = preds.select('protocol_labels','studydesc+procedurecode+reasonofstudy',"class.result").toPandas() +-- MAGIC preds_df['result'] = preds_df['result'].apply(lambda x : x[0]) + +-- COMMAND ---------- + +-- MAGIC %python +-- MAGIC from sklearn.metrics import classification_report +-- MAGIC +-- MAGIC print (classification_report(preds_df['protocol_labels'], preds_df['result'])) diff --git a/(Clone) code/mlflow_logging_ang.py b/(Clone) code/mlflow_logging_ang.py new file mode 100644 index 0000000..9938fd3 --- /dev/null +++ b/(Clone) code/mlflow_logging_ang.py @@ -0,0 +1,129 @@ +# Databricks notebook source +import mlflow +from transformers import AutoTokenizer, AutoModelForCausalLM + +# COMMAND ---------- + +!huggingface-cli login --token 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 + +# COMMAND ---------- + +tokenizer = AutoTokenizer.from_pretrained("mewtoo/RadLlama2") +model = AutoModelForCausalLM.from_pretrained("mewtoo/RadLlama2") + +# COMMAND ---------- + +import re +def trim_llm_output(text): + # Define a regular expression to match any punctuation + punctuation_regex = re.compile(r'[.,;!?://()]+') + + # Find the first occurrence of punctuation + match = punctuation_regex.search(text) + + if match: + # Split the text at the first occurrence of punctuation + split_text = text[:match.end()-1] + return split_text + else: + # No punctuation found, return the original text + return text + +# COMMAND ---------- + +def pred_wrapper(model, tokenizer, prompt, model_id=1, show_metrics=True, temp=0.1, max_length=1): + # Suppress Hugging Face pipeline logging + logging.set_verbosity(logging.CRITICAL) + + # Initialize the pipeline + pipe = pipeline(task="text-generation", + model=model, + tokenizer=tokenizer, + max_length=max_length, + do_sample=True, + temperature=temp) + + # Generate text using the pipeline + pipe = pipeline(task="text-generation", + model=model, + tokenizer=tokenizer, + max_length=100) + result = pipe(f"[INST] {prompt} [/INST]") + generated_text = result[0]['generated_text'] + + # Find the index of "### Assistant" in the generated text + index = generated_text.find("[/INST] ") + if index != -1: + # Extract the substring after "### Assistant" + substring_after_assistant = generated_text[index + len("[/INST] "):].strip() + substring_after_assistant = trim_llm_output(substring_after_assistant) + substring_after_assistant = substring_after_assistant.strip() + else: + # If "### Assistant" is not found, use the entire generated text + substring_after_assistant = generated_text.strip() + substring_after_assistant = trim_llm_output(substring_after_assistant) + substring_after_assistant = substring_after_assistant.strip() + + if show_metrics: + # Calculate evaluation metrics + metrics = run_metrics(substring_after_assistant, prompt, model_id) + + return substring_after_assistant, metrics + else: + return substring_after_assistant + + +# COMMAND ---------- + +def predict (prompt): + result = pred_wrapper(model, tokenizer, prompt, show_metrics=False) + return result + +# COMMAND ---------- + +input_example = 'Reason: 55M motor vehicle accident, lower back pain. History: Trauma-related pain with limited mobility.' + +# COMMAND ---------- + +#define input and output format of model +from mlflow.models.signature import infer_signature +from mlflow.transformers import generate_signature_output +signature = infer_signature( + model_input=input_example, + model_output="CT Dedicated Kidney" +) + +# COMMAND ---------- + +import tempfile +import os + +temp_dir = tempfile.TemporaryDirectory() +tokenizer_path = os.path.join(temp_dir.name, "tokenizer") + +tokenizer.save_pretrained(tokenizer_path) + +model_path = os.path.join(temp_dir.name, "model") +model.save_pretrained(model_path) + +mlflow.set_registry_uri("databricks-uc") +mlflow.set_experiment("/Users/narasimha.kamathardi@databricks.com/Project: radiology label prediction using LLMs/code/05_mlflow_logging") + +with mlflow.start_run(): + mlflow.pyfunc.log_model( + "ang_nara_catalog.rad_llm.radllama2_7b", + python_model=predict, + artifacts={"tokenizer": tokenizer_path, "model": model_path}, + input_example=input_example, + signature=signature + ) + run_id = mlflow.active_run().info.run_id + catalog = "ang_nara_catalog" + schema = "rad_llm" + model_name = "radllama2_7b" + mlflow.set_registry_uri("databricks-uc") + mlflow.register_model( + model_uri="runs:/"+run_id+"/ang_nara_catalog.rad_llm.radllama2_7b", + name=f"{catalog}.{schema}.{model_name}") + +temp_dir.cleanup() diff --git a/(Clone) code/mlflow_logging_nara.py b/(Clone) code/mlflow_logging_nara.py new file mode 100644 index 0000000..da6de05 --- /dev/null +++ b/(Clone) code/mlflow_logging_nara.py @@ -0,0 +1,164 @@ +# Databricks notebook source +pip install mlflow --upgrade + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +!huggingface-cli login --token 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 + +# COMMAND ---------- + +# Use a pipeline as a high-level helper +import os +from transformers import AutoTokenizer, AutoModelForCausalLM +tokenizer = AutoTokenizer.from_pretrained("RadiologyLLMs/RadLlama2-7b") +model = AutoModelForCausalLM.from_pretrained("RadiologyLLMs/RadLlama2-7b") +snapshot_location = os.path.expanduser("~/.cache/huggingface/model") +os.makedirs(snapshot_location, exist_ok=True) +model.save_pretrained(snapshot_location) + +# COMMAND ---------- + +import re +def trim_llm_output(text): + # Define a regular expression to match any punctuation + punctuation_regex = re.compile(r'[.,;!?:()<]+') + + # Find the first occurrence of punctuation + match = punctuation_regex.search(text) + + if match: + # Split the text at the first occurrence of punctuation + split_text = text[:match.end()-1] + return split_text + else: + # No punctuation found, return the original text + return text + +# COMMAND ---------- + +from transformers import pipeline +def pred_wrapper(model, tokenizer, prompt, model_id=1, show_metrics=True, temp=0.1, max_length=1): + pipe = pipeline(task="text-generation", + model=model, + tokenizer=tokenizer, + max_length=max_length, + do_sample=True, + temperature=temp) + + pipe = pipeline(task="text-generation", + model=model, + tokenizer=tokenizer, + max_length=100) + result = pipe(f"[INST] {prompt} [/INST]") + generated_text = result[0]['generated_text'] + + # Find the index of "### Assistant" in the generated text + index = generated_text.find("[/INST] ") + if index != -1: + # Extract the substring after "### Assistant" + substring_after_assistant = generated_text[index + len("[/INST] "):].strip() + substring_after_assistant = trim_llm_output(substring_after_assistant) + substring_after_assistant = substring_after_assistant.strip() + else: + # If "### Assistant" is not found, use the entire generated text + substring_after_assistant = generated_text.strip() + substring_after_assistant = trim_llm_output(substring_after_assistant) + substring_after_assistant = substring_after_assistant.strip() + + if show_metrics: + # Calculate evaluation metrics + metrics = run_metrics(substring_after_assistant, prompt, model_id) + + return substring_after_assistant, metrics + else: + return substring_after_assistant + + +# COMMAND ---------- + +def predict(): + result = pred_wrapper(model, tokenizer, prompt, show_metrics=False) + return result + +# COMMAND ---------- + +input_example = 'Notes: rule out renal recurrence History: Renal cell carcinoma, sp partial nephrectomy' + +# COMMAND ---------- + +#define input and output format of model +from mlflow.models.signature import infer_signature +from mlflow.transformers import generate_signature_output +signature = infer_signature( + model_input=input_example, + model_output="CT Dedicated Kidney" +) + +# COMMAND ---------- + +import mlflow +mlflow.set_registry_uri("databricks-uc") +mlflow.set_experiment("/Users/narasimha.kamathardi@databricks.com/Project: radiology label prediction using LLMs/code/mlflow_logging_nara") + +with mlflow.start_run(): + mlflow.pyfunc.log_model( + "ang_nara_catalog.rad_llm.radllama2_7b_test", + python_model=predict, + artifacts={'repository' : snapshot_location}, + input_example=input_example, + signature=signature + ) + run_id = mlflow.active_run().info.run_id + catalog = "ang_nara_catalog" + schema = "rad_llm" + model_name = "radllama2_7b_test" + mlflow.set_registry_uri("databricks-uc") + mlflow.register_model( + model_uri="runs:/"+run_id+"/ang_nara_catalog.rad_llm.radllama2_7b_test", + name=f"{catalog}.{schema}.{model_name}") + +# COMMAND ---------- + +import mlflow +logged_model = 'runs:/687dfd691f4f4767a974c960faaa0948/ang_nara_catalog.rad_llm.radllama2_7b_test' + +# Load model as a PyFuncModel. +loaded_model = mlflow.pyfunc.load_model(logged_model) + +data = {"clinical_notes": ["Notes: evaluate liver lesions, masses, HCC, possible thrombus, aberrant anatomy, ascites History: HCC, S/P TARE"]} + +# Predict on a Pandas DataFrame. +import pandas as pd +loaded_model.predict(pd.DataFrame(data)) + +# COMMAND ---------- + +import mlflow +logged_model = 'runs:/52d1cced2b6d46fd9fea04bfb3dae587/ang_nara_catalog.rad_llm.radllama2_7b_test' + +# Load model as a PyFuncModel. +loaded_model = mlflow.pyfunc.load_model(logged_model) + +data = {"clinical_notes": ["Notes: 20M with bladder extrophy with hx of urolithiasis, please perform Low Dose CT for kidney and bladder stone surveillance History: none"]} + +# Predict on a Pandas DataFrame. +import pandas as pd +loaded_model.predict(pd.DataFrame(data)) + +# COMMAND ---------- + +import mlflow +logged_model = 'runs:/550b8e7a15fb4996a4b40b4bc4b362b1/ang_nara_catalog.rad_llm.radllama2_7b_test' + +# Load model as a PyFuncModel. +loaded_model = mlflow.pyfunc.load_model(logged_model) + +data = {"clinical_notes": ["Notes: rule out renal recurrence or metastasis History: hx of renal cell carcinoma sp partial nephrectomy"]} + +# Predict on a Pandas DataFrame. +import pandas as pd +loaded_model.predict(pd.DataFrame(data)) diff --git a/(Clone) code/model_serving.py b/(Clone) code/model_serving.py new file mode 100644 index 0000000..89b71d4 --- /dev/null +++ b/(Clone) code/model_serving.py @@ -0,0 +1,74 @@ +# Databricks notebook source +import os +import pandas as pd +import requests +import json +from transformers import pipeline +import mlflow +from mlflow.models import infer_signature +from mlflow.transformers import generate_signature_output +from mlflow.tracking import MlflowClient + +# COMMAND ---------- + +mlflow.set_registry_uri('databricks-uc') + +# COMMAND ---------- + +import requests +import json + +# Set the name of the MLflow endpoint +endpoint_name = "radllama2_7b" + +# Name of the registered MLflow model +model_name = "ang_nara_catalog.rad_llm.radllama2_7b_test" + +# Get the latest version of the MLflow model +model_version = 2 + +# Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) +workload_type = "GPU_MEDIUM" + +# Specify the scale-out size of compute (Small, Medium, Large, etc.) +workload_size = "Small" + +# Specify Scale to Zero (only supported for CPU endpoints) +scale_to_zero = False + +# Get the API endpoint and token for the current notebook context +API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() +API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() + +# send the POST request to create the serving endpoint + +data = { + "name": endpoint_name, + "config": { + "served_models": [ + { + "model_name": model_name, + "model_version": model_version, + "workload_size": workload_size, + "scale_to_zero_enabled": scale_to_zero, + "workload_type": workload_type, + } + ] + }, +} + +headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} + +response = requests.post( + url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers +) + +print(json.dumps(response.json(), indent=4)) + +# COMMAND ---------- + + + +# COMMAND ---------- + + diff --git a/(Clone) code/model_serving_test.py b/(Clone) code/model_serving_test.py new file mode 100644 index 0000000..819ae65 --- /dev/null +++ b/(Clone) code/model_serving_test.py @@ -0,0 +1,74 @@ +# Databricks notebook source +import os +import pandas as pd +import requests +import json +from transformers import pipeline +import mlflow +from mlflow.models import infer_signature +from mlflow.transformers import generate_signature_output +from mlflow.tracking import MlflowClient + +# COMMAND ---------- + +mlflow.set_registry_uri('databricks-uc') + +# COMMAND ---------- + +import requests +import json + +# Set the name of the MLflow endpoint +endpoint_name = "radllama2_7b" + +# Name of the registered MLflow model +model_name = "ang_nara_catalog.rad_llm.radllama2_7b_test" + +# Get the latest version of the MLflow model +model_version = 3 + +# Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) +workload_type = "GPU_MEDIUM" + +# Specify the scale-out size of compute (Small, Medium, Large, etc.) +workload_size = "Small" + +# Specify Scale to Zero (only supported for CPU endpoints) +scale_to_zero = False + +# Get the API endpoint and token for the current notebook context +API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() +API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() + +# send the POST request to create the serving endpoint + +data = { + "name": endpoint_name, + "config": { + "served_models": [ + { + "model_name": model_name, + "model_version": model_version, + "workload_size": workload_size, + "scale_to_zero_enabled": scale_to_zero, + "workload_type": workload_type, + } + ] + }, +} + +headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} + +response = requests.post( + url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers +) + +print(json.dumps(response.json(), indent=4)) + +# COMMAND ---------- + + + +# COMMAND ---------- + + diff --git a/00_[PLEASE READ] Contributing to Solution Accelerators.py b/00_[PLEASE READ] Contributing to Solution Accelerators.py deleted file mode 100644 index 7d43126..0000000 --- a/00_[PLEASE READ] Contributing to Solution Accelerators.py +++ /dev/null @@ -1,127 +0,0 @@ -# Databricks notebook source -# MAGIC %md -# MAGIC -# MAGIC -# MAGIC # Contributing to [Industry Solution Accelerators](https://www.databricks.com/solutions/accelerators) - the Field Guide -# MAGIC -# MAGIC Thank you for your interest in contributing to solution accelerators! Solution accelerator are Databricks' repository to host reusable technical assets for industry technical challenges and business use cases. The program is run by Sales GTM Verticals and supported by field contribution. -# MAGIC -# MAGIC The purpose of this notebook is to describe the process for contributing to accelerators and provide helpful checklists for important milestones: intake, commit, standardization and publication. Hopefully this checklist will be useful for first-time and repeat contributors alike. -# MAGIC -# MAGIC -# MAGIC -# MAGIC ___ -# MAGIC Maintainer: [@nicole.lu](https://databricks.enterprise.slack.com/team/jingting_lu) -# MAGIC ___ -# MAGIC -# MAGIC ## Intake -# MAGIC ❓ If you brought your own code, can you summarize what problem your code solves in less than 100 words? -# MAGIC * Does it tackle an industry **business use case**, or a common industry **technical challenge** -# MAGIC -# MAGIC ❓ Have you discussed the topic with a Technical Director? If you are not sure which vertical your work is best suited for, contact [@nicole.lu](https://databricks.enterprise.slack.com/team/jingting_lu) for an intake consultation. -# MAGIC -# MAGIC **The Technical Directors will approve the accelerator and place it on a publication roadmap for their industry.** The Technical Directors are: -# MAGIC * Retail CPG: Bryan Smith -# MAGIC * Financial Services: Antoine Amend, Eon Retief -# MAGIC * Media Entertainment: Dan Morris -# MAGIC * Health Life Sciense: Amir Kermany, Aaron Zarova -# MAGIC * Manufacturing: Bala Amavasai -# MAGIC * Cyber Security: Lipyeow Lim -# MAGIC * Public Sector: No Technical Director but Field Eng owns content curation and development. Reach out to Milos Colic -# MAGIC -# MAGIC ❓ Do we have the rights to use the source datasets and libraries in your code? -# MAGIC - Please fill out the dependency-license table in the README. Make sure our dependencies are **permissive** open source. Permissive open source licenses include MIT, Apache and BSD. See `go/opensource` for more details. -# MAGIC - If we need to use some written documentation to substantiate our rights to use any idea, data or code dependency, file a legal review ticket -# MAGIC - If you need to synthesize and store some source data, use a publically accessible cloud storage, such as `s3://db-gtm-industry-solutions/data/` -# MAGIC -# MAGIC ❓ Is the code reusable by a broad array of customers? No customer-specific implementation details please. -# MAGIC -# MAGIC ❓ Do you know the scope of work for this accelerator? -# MAGIC * At the minimum, you are responsible for making the code in the repo to tell a cohesive story -# MAGIC * You may need to provide a blog post, video recording or slides. The technical director will discuss and decide which **publishing tier** the accelerator will be launched at. The **publishing tier** determines the full scope and the list of final deliverables for the accelerator. Higher tiers may require a blog post, slides, video recording and more. The industry vertical will lean in with marketing resources if they decide to publish the accelerator at a higher tier 💪 -# MAGIC -# MAGIC -# MAGIC ___ -# MAGIC -# MAGIC -# MAGIC ## Commit: Before the First Code Review -# MAGIC -# MAGIC ❓ Do you know how you will be collaborating with reviewers and other contributors on this code? -# MAGIC * You may collaborate with the reviewer/contributor in the same workspace -# MAGIC * You may also receive a repo for the accelerator in https://github.com/databricks-industry-solutions to collaborate on via pull requests -# MAGIC -# MAGIC ❓ Do we have rights to use the source data and dependencies? Do we need to host data? -# MAGIC - Please fill out the dependency-license table in the README. Make sure our dependencies are open source. If we need to use some written documentation to substantiate our rights to use any data or code dependency, file an LPP (legal review) ticket -# MAGIC - If you need to synthesize and store some source data, use a publically accessible cloud storage, such as `s3://db-gtm-industry-solutions/data/` -# MAGIC -# MAGIC ❓ Does the code contain any credentials? If yes, **scrub** the credentials from your code. Contact [@nicole.lu](https://databricks.enterprise.slack.com/team/jingting_lu) to set up secrets in demo and testing workspaces. Prepare a short paragraph describing how the user would set up the dependencies and collect their own credentials -# MAGIC -# MAGIC ❓ Have you explored https://github.com/databricks-industry-solutions/industry-solutions-blueprints? This repo illustrates a compulsory directory standard. All new accelerator repos are created with this template in place. If you are provided a repo for collaboration, please commit your code according to this template. -# MAGIC -# MAGIC - **Narrative notebooks** are stored on the top level and **numbered**. -# MAGIC - **The RUNME notebook** is the entry point of your accelerator. It creates the job and clusters your user will use to run the notebooks, acting as the definition of the integration test for this accelerator. All published solution accelerator run nightly integration tests -# MAGIC - **Util and configuration notebooks** can be stored `./util` and `./config` directories. Example util notebooks for common tasks such as **preparing source data** and **centralizing configuration** are available in this repo and they are reused in almost every accelerator. You can save time by modifying and reusing these standard components. -# MAGIC - **Dashboards** can be saved in `./dashboard` directory and created in the `RUNME` notebook. See an example in the `RUNME` notebook in this repository. The dashboard import feature is in private preview and enabled on the [e2-demo-field-eng workspace](https://e2-demo-field-eng.cloud.databricks.com/?o=1444828305810485). -# MAGIC - **Images and other arbitrary files** can be stored in `./images/` and `./resources/` directories if they are not large (less than 1 mb). Imagines can be embedded via its Github url, which will work once the repository is made public. Do not use relative paths like `./images/image.png` in either notebooks or the README.md file - see the images throughout this notebook for examples. Larger resources can be stored in a public storage account, such as , such as `s3://db-gtm-industry-solutions/` -# MAGIC -# MAGIC ___ -# MAGIC -# MAGIC ## Standardization: Before Reviewing with Technical Directors and other Collaborators -# MAGIC -# MAGIC ❓ Have you read a few accelerators and familiarized with the style? Here are some great recent examples: [IOC matching accelerator](https://github.com/databricks-industry-solutions/ioc-matching) from Cyber Security, [Pixels accelerator](https://github.com/databricks-industry-solutions/pixels) from HLS, [ALS Recommender accelerator](https://github.com/databricks-industry-solutions/als-recommender) from RCG. -# MAGIC -# MAGIC ❓ Have you tested the code end-to-end? -# MAGIC * Set up the multi-task job in `RUNME` by modifying the sample job json - the job defines the workflow you intend the user to run in their own workspace -# MAGIC * Run the RUNME notebook to generate your accelerator workflow. Run the workflow end-to-end to show that all code runs for the accelerator. -# MAGIC * Create a [**pull request**](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/about-pull-requests) from your branch into main. The creation of pull request triggers integration tests in multiple workspaces. [Example](https://github.com/databricks-industry-solutions/media-mix-modeling/actions) -# MAGIC -# MAGIC ❓ Have you resolved all integration test errors? If you have issues seeing integration run histories for debugging, slack [@nicole.lu](https://databricks.enterprise.slack.com/team/jingting_lu) for help. -# MAGIC -# MAGIC ___ -# MAGIC -# MAGIC ## Publication: Before the Content is Made Publically Visible -# MAGIC -# MAGIC Accelerators must be reviewed with the sponsoring Technical Director and 1 other technical SME. The SME can be a lead of the SME groups (ML-SME etc) or a vertical lead SA. -# MAGIC -# MAGIC ❓ Have you resolved all integration test errors? -# MAGIC -# MAGIC ❓ Does your accelerator have in-depth discussion with at least one focus: **business use case**, **industry technical challenge** or both -# MAGIC -# MAGIC ❓ Does the notebook(s) explain the business use case and the technical pattern via sufficient Markdowns? -# MAGIC -# MAGIC ❓ Did you work with the Industry Marketers to publish other marketing assets such as blogs? -# MAGIC * RCG, MFG: Sam Steiny -# MAGIC * FSI, Cyber Security: Anna Cuisia -# MAGIC * CME: Bryan Saftler -# MAGIC * HLS: Adam Crown -# MAGIC * Public Sector: Lisa Sion -# MAGIC -# MAGIC --- -# MAGIC -# MAGIC If your answers are yes to all the above ... -# MAGIC ## 🍻 Congratulations! You have successfully published a solution accelerator. -# MAGIC -# MAGIC Your thought leadership -# MAGIC * Is visible on the Databricks [website](https://www.databricks.com/solutions/accelerators) -# MAGIC * May be showcased on our Marketplace -# MAGIC * May be used in training material -# MAGIC * Maybe implemented by our Professional Services, Cloud Partners, SIs and have many more channels of influence. -# MAGIC -# MAGIC ___ -# MAGIC -# MAGIC ## Maintenance, Feedback and Continued Improvement -# MAGIC ❗ If you know of a customer who benefited from an accelerator, you or the account team should fill out the customer use capture form [here](https://docs.google.com/forms/d/1Seo5dBNYsLEK7QgZ1tzPvuA9rxXxr1Sh_2cwu9hM9gM/edit) 📋 -# MAGIC -# MAGIC ❗ You can track which Customer Accounts imported your accelerator if you have [logfood](https://adb-2548836972759138.18.azuredatabricks.net/sql/dashboards/b85f5b93-2e4c-40ee-92fd-9b30d1d8a659?o=2548836972759138#) access. 📈 -# MAGIC -# MAGIC ❗ [@nicole.lu](https://databricks.enterprise.slack.com/team/jingting_lu) may reach out for help if some hard-to-resolve bugs arose from nightly testing 🪲 -# MAGIC -# MAGIC ❗ Users may open issues to ask questions about the accelerator. Users may also contribute to solution accelerators as long as they accept our Contributing License Agreement. We have an automated process in place and the external collaborator can accept the Contributing License Agreement on their own. 🤝 - -# COMMAND ---------- - - - -# COMMAND ---------- - - diff --git a/00_introduction.py b/00_introduction.py new file mode 100644 index 0000000..cdad790 --- /dev/null +++ b/00_introduction.py @@ -0,0 +1,2 @@ +# Databricks notebook source + diff --git a/01_data_preparation.py b/01_data_preparation.py new file mode 100644 index 0000000..c752520 --- /dev/null +++ b/01_data_preparation.py @@ -0,0 +1,44 @@ +# Databricks notebook source +import pyspark.pandas as ps +import pyspark.sql.utils +import pandas as pd +import re +import dlt +from pyspark.sql.functions import * +ps.set_option('compute.ops_on_diff_frames', True) + +# COMMAND ---------- + +@dlt.table +def load_data(): + data = pd.read_csv( + "//Volumes/ang_nara_catalog/rad_llm/clinical_data/12k_handwritten_clinical_notes.csv" + ) + data = data.drop(['Unnamed: 0'], axis=1) + return spark.createDataFrame(data) + + +# COMMAND ---------- + +@dlt.table +def remove_label_counts_less_than_50(): + df = dlt.read('load_data') + df.write.format("delta").mode("overwrite").option("overwriteSchema",True).saveAsTable("ang_nara_catalog.rad_llm.delta_rad") + df = spark.sql(""" + SELECT t.input, t.radiology_labels + FROM ( + SELECT t.*, COUNT(*) OVER (PARTITION BY radiology_labels) AS cnt + FROM ang_nara_catalog.rad_llm.delta_rad t + ) t + WHERE cnt > 50 +""") + return df + +# COMMAND ---------- + +@dlt.table +def filtered_table(): + df = dlt.read('remove_label_counts_less_than_50') + df = df.withColumn("instruction", lit('predict radiology label for the clinical notes')) + df.write.format("delta").mode("overwrite").option("overwriteSchema",True).saveAsTable("ang_nara_catalog.rad_llm.delta_rad_filtered") + return df diff --git a/02_create_monitor.py b/02_create_monitor.py new file mode 100644 index 0000000..d30197e --- /dev/null +++ b/02_create_monitor.py @@ -0,0 +1,43 @@ +# Databricks notebook source +# MAGIC %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.3.6-py3-none-any.whl" + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +from databricks import lakehouse_monitoring as lm +info = lm.create_monitor( + table_name=f"ang_nara_catalog.rad_llm.delta_rad_filtered", + profile_type=lm.Snapshot(), + output_schema_name=f"ang_nara_catalog.rad_llm" +) + +# COMMAND ---------- + +import time + +# Wait for monitor to be created +while info.status == lm.MonitorStatus.PENDING: + info = lm.get_monitor(table_name=f"ang_nara_catalog.rad_llm.delta_rad_filtered") + time.sleep(10) + +assert(info.status == lm.MonitorStatus.ACTIVE) + +# COMMAND ---------- + +# A metric refresh will automatically be triggered on creation +refreshes = lm.list_refreshes(table_name=f"ang_nara_catalog.rad_llm.delta_rad_filtered") +assert(len(refreshes) > 0) + +run_info = refreshes[0] +while run_info.state in (lm.RefreshState.PENDING, lm.RefreshState.RUNNING): + run_info = lm.get_refresh(table_name=f"ang_nara_catalog.rad_llm.delta_rad_filtered", refresh_id=run_info.refresh_id) + time.sleep(30) + +assert(run_info.state == lm.RefreshState.SUCCESS) + +# COMMAND ---------- + +lm.get_monitor(table_name=f"ang_nara_catalog.rad_llm.delta_rad_filtered") diff --git a/03_train_llm.py b/03_train_llm.py new file mode 100644 index 0000000..e333b38 --- /dev/null +++ b/03_train_llm.py @@ -0,0 +1,230 @@ +# Databricks notebook source +#install libraries +!pip install -q accelerate==0.21.0 peft==0.4.0 bitsandbytes==0.40.2 transformers==4.31.0 trl==0.4.7 + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC USE CATALOG ang_nara_catalog + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC CREATE VOLUME IF NOT EXISTS ang_nara_catalog.rad_llm.results + +# COMMAND ---------- + +#import libraries +import os +import torch +from datasets import load_dataset +from transformers import ( + AutoModelForCausalLM, + AutoTokenizer, + BitsAndBytesConfig, + HfArgumentParser, + TrainingArguments, + pipeline, + logging, +) +from peft import LoraConfig, PeftModel, get_peft_model +from trl import SFTTrainer + +# COMMAND ---------- + +#PEFT LORA configurations definitions used for multi-gpu +local_rank = -1 +per_device_train_batch_size = 4 +per_device_eval_batch_size = 4 +gradient_accumulation_steps = 1 +learning_rate = 2e-4 +model_name = 'epfl-llm/meditron-7b' +max_grad_norm = 0.3 +weight_decay = 0.001 +lora_alpha = 16 +lora_dropout = 0.1 +lora_r = 64 +max_seq_length = None + +# COMMAND ---------- + +use_4bit = True #enable QLORA +use_nested_quant = False +bnb_4bit_compute_dtype = "float16" +bnb_4bit_quant_type = "nf4" #Quantization type (fp4 or nf4) +fp16 = False #Disable fp16 to False as we are using 4-bit precision QLORA +bf16 = False #Disable bf16 to False as we are using 4-bit precision QLORA +packing = False +gradient_checkpointing = True #Enable gradient checkpoint +optim = "paged_adamw_32bit" #Optimizer used for weight updates +lr_scheduler_type = "cosine" #The cosine lrs function has been shown to perform better than alternatives like simple linear annealing in practice. +max_steps = -1 #Number of optimizer update steps +warmup_ratio = 0.2 #Define training warmup fraction +group_by_length = True #Group sequences into batches with same length (saves memory and speeds up training considerably) +save_steps = 800 #Save checkpoint every X updates steps +logging_steps = 800 #Log every X updates steps +output_dir = "/Volumes/ang_nara_catalog/rad_llm/results" +device_map = {"": 0} + +# COMMAND ---------- + +def load_model(model_name): + """ + Function to load the LLM model weights, peft config, and tokenizer from HF + """ + compute_dtype = getattr(torch, bnb_4bit_compute_dtype) + + #Quantization config for QLORA + bnb_config = BitsAndBytesConfig( + load_in_4bit=use_4bit, + bnb_4bit_quant_type=bnb_4bit_quant_type, + bnb_4bit_compute_dtype=compute_dtype, + bnb_4bit_use_double_quant=use_nested_quant, + ) + #Model + model = AutoModelForCausalLM.from_pretrained( + model_name, + device_map=device_map, + quantization_config=bnb_config + ) + + #Turn off cache to use the updated model params + model.config.use_cache = False + + #This value is necessary to ensure exact reproducibility of the pretraining results + model.config.pretraining_tp = 1 + + #LORA Config + peft_config = LoraConfig( + lora_alpha=lora_alpha, #Controls LORA scaling; higher value makes the approximation more influencial + lora_dropout=lora_dropout, #Probability that each neuron's output set to 0; prevents overfittig + r=lora_r, #LORA rank param; lower value makes model faster but sacrifices performance + bias="none",#For performance, we recommend setting bias to none first, and then lora_only, before trying all + task_type="CAUSAL_LM", + ) + #Tokenizer + tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) + tokenizer.pad_token = tokenizer.eos_token + tokenizer.padding_side = "right" + + return model, tokenizer, peft_config + +# COMMAND ---------- + +!huggingface-cli login --token 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 + +# COMMAND ---------- + +model, tokenizer, peft_config = load_model(model_name) + +# COMMAND ---------- + +df = spark.sql("SELECT * FROM ang_nara_catalog.rad_llm.delta_rad_filtered") +df = df.toPandas() + +# COMMAND ---------- + +#Generate list of dictionaries +dataset_data = [ + { + "instruction": "predict radiology labels for the clinical notes", + "clinical_notes": row_dict["input"], + "radiology_labels": row_dict["radiology_labels"] + } + for row_dict in df.to_dict(orient="records") +] + +# COMMAND ---------- + +#Write dictionary list to a json file +import json +with open("/Volumes/ang_nara_catalog/rad_llm/clinical_data/filtered_clinical_notes.json", "w") as f: + json.dump(dataset_data, f) + +# COMMAND ---------- + +def format_rad(sample): + """ + Function to create dataset as per Llama2 prompt format + """ + instruction = f"[INST] {sample['instruction']}" + context = f"Here's some context: {sample['clinical_notes']}" if len(sample["clinical_notes"]) > 0 else None + response = f" [/INST] {sample['radiology_labels']}" + #Join all the parts together + prompt = "".join([i for i in [instruction, context, response] if i is not None]) + return prompt + +def template_dataset(sample): + """ + Function to apply Llama2 prompt format on the entire dataset + """ + sample["text"] = f"{format_rad(sample)}{tokenizer.eos_token}" + return sample + +#Apply prompt template per sample +dataset = load_dataset("json", data_files="/Volumes/ang_nara_catalog/rad_llm/clinical_data/filtered_clinical_notes.json", split="train") + +# Shuffle the dataset +dataset_shuffled = dataset.shuffle(seed=42) +dataset = dataset.map(template_dataset, remove_columns=list(dataset.features)) +dataset + +# COMMAND ---------- + +#Model training +training_arguments = TrainingArguments( + output_dir=output_dir, + per_device_train_batch_size=per_device_train_batch_size, + gradient_accumulation_steps=gradient_accumulation_steps, + optim=optim, + save_steps=save_steps, + logging_steps=logging_steps, + learning_rate=learning_rate, + fp16=fp16, + bf16=bf16, + max_grad_norm=max_grad_norm, + max_steps=max_steps, + warmup_ratio=warmup_ratio, + group_by_length=group_by_length, + lr_scheduler_type=lr_scheduler_type, + ddp_find_unused_parameters=False, +) + +trainer = SFTTrainer( + model=model, + train_dataset=dataset, + peft_config=peft_config, + dataset_text_field="text", + max_seq_length=max_seq_length, + tokenizer=tokenizer, + args=training_arguments, + packing=packing, +) + +trainer.train() +trainer.model.save_pretrained(output_dir) + +# COMMAND ---------- + +# Reload model in FP16 and merge it with LoRA weights +# To merge the model weights in HF, restart compute, run the first 6 cells, and run this cell + +base_model = AutoModelForCausalLM.from_pretrained( + model_name, + low_cpu_mem_usage=True, + return_dict=True, + torch_dtype=torch.float16, + device_map=device_map, +) +model = PeftModel.from_pretrained(base_model, output_dir) +model = model.merge_and_unload() + +# Reload tokenizer to save it +tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) +tokenizer.pad_token = tokenizer.eos_token +tokenizer.padding_side = "right" + +# COMMAND ---------- + +model.push_to_hub("RadiologyLLMs/RadLlama2-7b", use_auth_token=True, create_pr=1, max_shard_size='20GB') +tokenizer.push_to_hub("RadiologyLLMs/RadLlama2-7b", use_auth_token=True, create_pr=1) diff --git a/04_model_prediction_premlflow.py b/04_model_prediction_premlflow.py new file mode 100644 index 0000000..3c70f25 --- /dev/null +++ b/04_model_prediction_premlflow.py @@ -0,0 +1,119 @@ +# Databricks notebook source +#install libraries +!pip install -q accelerate==0.21.0 peft==0.4.0 bitsandbytes==0.40.2 transformers==4.31.0 trl==0.4.7 + +# COMMAND ---------- + +#import libraries +import os +import torch +from datasets import load_dataset +from transformers import ( + AutoModelForCausalLM, + AutoTokenizer, + BitsAndBytesConfig, + HfArgumentParser, + TrainingArguments, + pipeline, + logging, +) +from peft import LoraConfig, PeftModel, get_peft_model +from trl import SFTTrainer + +# COMMAND ---------- + +!huggingface-cli login --token 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 + +# COMMAND ---------- + +import os +from transformers import AutoTokenizer, AutoModelForCausalLM +tokenizer = AutoTokenizer.from_pretrained("RadiologyLLMs/RadLlama2-7b") +model = AutoModelForCausalLM.from_pretrained("RadiologyLLMs/RadLlama2-7b") + +# COMMAND ---------- + +import re +def trim_llm_output(text): + # Define a regular expression to match any punctuation + punctuation_regex = re.compile(r'[.,;!?:()<]+') + + # Find the first occurrence of punctuation + match = punctuation_regex.search(text) + + if match: + # Split the text at the first occurrence of punctuation + split_text = text[:match.end()-1] + return split_text + else: + # No punctuation found, return the original text + return text + +# COMMAND ---------- + +from transformers import pipeline +def pred_wrapper(model, tokenizer, prompt, model_id=1, show_metrics=True, temp=0.1, max_length=1): + pipe = pipeline(task="text-generation", + model=model, + tokenizer=tokenizer, + max_length=max_length, + do_sample=True, + temperature=temp) + + pipe = pipeline(task="text-generation", + model=model, + tokenizer=tokenizer, + max_length=100) + result = pipe(f"[INST] {prompt} [/INST]") + generated_text = result[0]['generated_text'] + + # Find the index of "### Assistant" in the generated text + index = generated_text.find("[/INST] ") + if index != -1: + # Extract the substring after "### Assistant" + substring_after_assistant = generated_text[index + len("[/INST] "):].strip() + substring_after_assistant = trim_llm_output(substring_after_assistant) + substring_after_assistant = substring_after_assistant.strip() + else: + # If "### Assistant" is not found, use the entire generated text + substring_after_assistant = generated_text.strip() + substring_after_assistant = trim_llm_output(substring_after_assistant) + substring_after_assistant = substring_after_assistant.strip() + + if show_metrics: + # Calculate evaluation metrics + metrics = run_metrics(substring_after_assistant, prompt, model_id) + + return substring_after_assistant, metrics + else: + return substring_after_assistant + + +# COMMAND ---------- + +import pandas as pd +df_test = pd.read_csv('/Volumes/ang_nara_catalog/rad_llm/clinical_data/test_rad_dataset.csv') + +# COMMAND ---------- + +prediction = list() +for index, row in df_test.iterrows(): + prompt = row['clinical_notes'] + pred = pred_wrapper(model, tokenizer, prompt, show_metrics=False) + prediction.append(pred) +df_test['prediction'] = prediction +spark_df_test = spark.createDataFrame(df_test) + +# COMMAND ---------- + +#drop index column +spark_df_test = spark_df_test.drop("Unnamed: 0") + +# COMMAND ---------- + +spark_df_test.write.format("delta").mode("overwrite").saveAsTable("ang_nara_catalog.rad_llm.rad_pred_premlflow") + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC select * from ang_nara_catalog.rad_llm.rad_pred_premlflow diff --git a/05_mlflow_logging.py b/05_mlflow_logging.py new file mode 100644 index 0000000..ab25810 --- /dev/null +++ b/05_mlflow_logging.py @@ -0,0 +1,135 @@ +# Databricks notebook source +pip install mlflow --upgrade + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +import mlflow +from transformers import AutoTokenizer, AutoModelForCausalLM + +# COMMAND ---------- + +!huggingface-cli login --token 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 + +# COMMAND ---------- + +tokenizer = AutoTokenizer.from_pretrained("RadiologyLLMs/RadLlama2-7b") +model = AutoModelForCausalLM.from_pretrained("RadiologyLLMs/RadLlama2-7b") + +# COMMAND ---------- + +import re +def trim_llm_output(text): + # Define a regular expression to match any punctuation + punctuation_regex = re.compile(r'[.,;!?:()<]+') + + # Find the first occurrence of punctuation + match = punctuation_regex.search(text) + + if match: + # Split the text at the first occurrence of punctuation + split_text = text[:match.end()-1] + return split_text + else: + # No punctuation found, return the original text + return text + +# COMMAND ---------- + +from transformers import pipeline +def pred_wrapper(model, tokenizer, prompt, model_id=1, show_metrics=True, temp=0.1, max_length=1): + # Initialize the pipeline + pipe = pipeline(task="text-generation", + model=model, + tokenizer=tokenizer, + max_length=max_length, + do_sample=True, + temperature=temp) + + # Generate text using the pipeline + pipe = pipeline(task="text-generation", + model=model, + tokenizer=tokenizer, + max_length=100) + result = pipe(f"[INST] {prompt} [/INST]") + generated_text = result[0]['generated_text'] + + # Find the index of "### Assistant" in the generated text + index = generated_text.find("[/INST] ") + if index != -1: + # Extract the substring after "### Assistant" + substring_after_assistant = generated_text[index + len("[/INST] "):].strip() + substring_after_assistant = trim_llm_output(substring_after_assistant) + substring_after_assistant = substring_after_assistant.strip() + else: + # If "### Assistant" is not found, use the entire generated text + substring_after_assistant = generated_text.strip() + substring_after_assistant = trim_llm_output(substring_after_assistant) + substring_after_assistant = substring_after_assistant.strip() + + if show_metrics: + # Calculate evaluation metrics + metrics = run_metrics(substring_after_assistant, prompt, model_id) + + return substring_after_assistant, metrics + else: + return substring_after_assistant + + +# COMMAND ---------- + +def predict (prompt): + result = pred_wrapper(model, tokenizer, prompt, show_metrics=False) + return result + +# COMMAND ---------- + +input_example = 'Notes: rule out renal recurrence History: Renal cell carcinoma, sp partial nephrectomy' + +# COMMAND ---------- + +#define input and output format of model +from mlflow.models.signature import infer_signature +from mlflow.transformers import generate_signature_output +signature = infer_signature( + model_input=input_example, + model_output="CT Dedicated Kidney" +) + +# COMMAND ---------- + +import tempfile +import os + +temp_dir = tempfile.TemporaryDirectory() +tokenizer_path = os.path.join(temp_dir.name, "tokenizer") + +tokenizer.save_pretrained(tokenizer_path) + +model_path = os.path.join(temp_dir.name, "model") +model.save_pretrained(model_path) + +mlflow.set_registry_uri("databricks-uc") +mlflow.set_experiment("/Users/narasimha.kamathardi@databricks.com/Project: radiology label prediction using LLMs/code/sandbox") + +with mlflow.start_run(): + mlflow.pyfunc.log_model( + "ang_nara_catalog.rad_llm.radllama2_7b_test", + python_model=predict, + artifacts={"tokenizer": tokenizer_path, "model": model_path}, + input_example=input_example, + signature=signature + ) + run_id = mlflow.active_run().info.run_id + catalog = "ang_nara_catalog" + schema = "rad_llm" + model_name = "radllama2_7b_test" + mlflow.set_registry_uri("databricks-uc") + mlflow.register_model( + model_uri="runs:/"+run_id+"/ang_nara_catalog.rad_llm.radllama2_7b_test", + name=f"{catalog}.{schema}.{model_name}") + +temp_dir.cleanup() diff --git a/06_mlflow_batch_prediciton.py b/06_mlflow_batch_prediciton.py new file mode 100644 index 0000000..eab11a4 --- /dev/null +++ b/06_mlflow_batch_prediciton.py @@ -0,0 +1,45 @@ +# Databricks notebook source +!pip install mlflow --upgrade + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +import mlflow +mlflow.set_registry_uri("databricks-uc") + +# COMMAND ---------- + +import pandas as pd +data = pd.read_csv('/Volumes/ang_nara_catalog/rad_llm/clinical_data/batch_prediction_20_notes.csv') +notes = data[['clinical_notes']] + +# COMMAND ---------- + +# You can update the catalog and schema name containing the model in Unity Catalog if needed +CATALOG_NAME = "ang_nara_catalog" +SCHEMA_NAME = "rad_llm" +MODEL_NAME = f"{CATALOG_NAME}.{SCHEMA_NAME}.radllama2_7b_test" + +# COMMAND ---------- + +import mlflow.pyfunc +model_uri = "models:/{model_name}@radllama".format(model_name=MODEL_NAME) +model = mlflow.pyfunc.load_model(model_uri) + +# COMMAND ---------- + +prediction = notes.apply(lambda row: model.predict(row), axis=1) +data['predicition'] = prediction +spark_data = spark.createDataFrame(data) + +# COMMAND ---------- + +spark_data.write.format("delta").mode("overwrite").saveAsTable("ang_nara_catalog.rad_llm.rad_batch_pred_20_notes") + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC select * from ang_nara_catalog.rad_llm.rad_batch_pred_20_notes diff --git a/_Analysis.py b/_Analysis.py deleted file mode 100644 index 5a5128d..0000000 --- a/_Analysis.py +++ /dev/null @@ -1,3 +0,0 @@ -# Databricks notebook source -# MAGIC %md -# MAGIC You may find this series of notebooks at https://github.com/databricks-industry-solutions/sample-repo. For more information about this solution accelerator, visit https://www.databricks.com/solutions/accelerators/sample-accelerator diff --git a/_Introduction_And_Setup.py b/_Introduction_And_Setup.py deleted file mode 100644 index 5a5128d..0000000 --- a/_Introduction_And_Setup.py +++ /dev/null @@ -1,3 +0,0 @@ -# Databricks notebook source -# MAGIC %md -# MAGIC You may find this series of notebooks at https://github.com/databricks-industry-solutions/sample-repo. For more information about this solution accelerator, visit https://www.databricks.com/solutions/accelerators/sample-accelerator diff --git a/util/data-extract.py b/util/data-extract.py deleted file mode 100644 index b3e34f0..0000000 --- a/util/data-extract.py +++ /dev/null @@ -1,52 +0,0 @@ -# Databricks notebook source -# MAGIC %md The purpose of this notebook is to download and set up the data we will use for the solution accelerator. Before running this notebook, make sure you have entered your own credentials for Kaggle and have agreed to the Terms and Conditions of using this dataset. - -# COMMAND ---------- - -# MAGIC %pip install kaggle - -# COMMAND ---------- - -# MAGIC %md -# MAGIC Set Kaggle credential configuration values in the block below: You can set up a [secret scope](https://docs.databricks.com/security/secrets/secret-scopes.html) to manage credentials used in notebooks. For the block below, we have manually set up the `solution-accelerator-cicd` secret scope and saved our credentials there for internal testing purposes. - -# COMMAND ---------- - -import os -# os.environ['kaggle_username'] = 'YOUR KAGGLE USERNAME HERE' # replace with your own credential here temporarily or set up a secret scope with your credential -os.environ['kaggle_username'] = dbutils.secrets.get("solution-accelerator-cicd", "kaggle_username") - -# os.environ['kaggle_key'] = 'YOUR KAGGLE KEY HERE' # replace with your own credential here temporarily or set up a secret scope with your credential -os.environ['kaggle_key'] = dbutils.secrets.get("solution-accelerator-cicd", "kaggle_key") - -# COMMAND ---------- - -# MAGIC %md Download the data from Kaggle using the credentials set above: - -# COMMAND ---------- - -# MAGIC %sh -# MAGIC cd /databricks/driver -# MAGIC export KAGGLE_USERNAME=$kaggle_username -# MAGIC export KAGGLE_KEY=$kaggle_key -# MAGIC kaggle datasets download -d frtgnn/dunnhumby-the-complete-journey -# MAGIC unzip dunnhumby-the-complete-journey.zip - -# COMMAND ---------- - -# MAGIC %md Move the downloaded data to the folder used throughout the accelerator: - -# COMMAND ---------- - -dbutils.fs.mv("file:/databricks/driver/campaign_desc.csv", "dbfs:/tmp/propensity/bronze/campaign_desc.csv") -dbutils.fs.mv("file:/databricks/driver/campaign_table.csv", "dbfs:/tmp/propensity/bronze/campaign_table.csv") -dbutils.fs.mv("file:/databricks/driver/causal_data.csv", "dbfs:/tmp/propensity/bronze/causal_data.csv") -dbutils.fs.mv("file:/databricks/driver/coupon.csv", "dbfs:/tmp/propensity/bronze/coupon.csv") -dbutils.fs.mv("file:/databricks/driver/coupon_redempt.csv", "dbfs:/tmp/propensity/bronze/coupon_redempt.csv") -dbutils.fs.mv("file:/databricks/driver/hh_demographic.csv", "dbfs:/tmp/propensity/bronze/hh_demographic.csv") -dbutils.fs.mv("file:/databricks/driver/product.csv", "dbfs:/tmp/propensity/bronze/product.csv") -dbutils.fs.mv("file:/databricks/driver/transaction_data.csv", "dbfs:/tmp/propensity/bronze/transaction_data.csv") - -# COMMAND ---------- - - diff --git a/util/generate-iot-data.py b/util/generate-iot-data.py deleted file mode 100644 index 2bb62af..0000000 --- a/util/generate-iot-data.py +++ /dev/null @@ -1,106 +0,0 @@ -# Databricks notebook source -# MAGIC %md -# MAGIC -# MAGIC ## IoT Data Generation -# MAGIC -# MAGIC -# MAGIC -# MAGIC In this notebook, we use `dbldatagen` to generate fictitious data and push into a Kafka topic. - -# COMMAND ---------- - -# MAGIC %md -# MAGIC Generate the Data - -# COMMAND ---------- - -# MAGIC %run ./notebook-config - -# COMMAND ---------- - -import dbldatagen as dg -import dbldatagen.distributions as dist -from pyspark.sql.types import IntegerType, FloatType, StringType, LongType - -states = [ 'AK', 'AL', 'AR', 'AZ', 'CA', 'CO', 'CT', 'DC', 'DE', 'FL', 'GA', - 'HI', 'IA', 'ID', 'IL', 'IN', 'KS', 'KY', 'LA', 'MA', 'MD', 'ME', - 'MI', 'MN', 'MO', 'MS', 'MT', 'NC', 'ND', 'NE', 'NH', 'NJ', 'NM', - 'NV', 'NY', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', - 'UT', 'VA', 'VT', 'WA', 'WI', 'WV', 'WY' ] - -table_name = "iot_stream_example_" -spark.sql(f"drop table if exists {table_name}") - -data_rows = 2000 -df_spec = ( - dg.DataGenerator( - spark, - name="test_data_set1", - rows=data_rows, - partitions=4 - ) - .withIdOutput() - .withColumn("device_id", IntegerType(), minValue=1, maxValue=1000) - .withColumn( - "device_model", - StringType(), - values=['mx2000', 'xft-255', 'db-1000', 'db-2000', 'mlr-120'], - random=True - ) - .withColumn("timestamp", LongType(), minValue=1577833200, maxValue=1673714337, random=True) - .withColumn("sensor_1", IntegerType(), minValue=-10, maxValue=100, random=True, distribution=dist.Gamma(40.0,9.0)) - .withColumn("sensor_2", IntegerType(), minValue=0, maxValue=10, random=True) - .withColumn("sensor_3", FloatType(), minValue=0.0001, maxValue=1.0001, random=True) - .withColumn("state", StringType(), values=states, random=True) -) - -df = df_spec.build() -display(df) - -# COMMAND ---------- - -# MAGIC %md -# MAGIC Write to Kafka - -# COMMAND ---------- - -from pyspark.sql.functions import to_json, struct, col, cast -from pyspark.sql.types import BinaryType -import time - -#Get the data ready for Kafka -kafka_ready_df = ( - df.select( - col("id").cast(BinaryType()).alias("key"), - to_json( - struct( - [col(column) for column in df.columns] - ) - ).cast(BinaryType()).alias("value") - ) -) - -display(kafka_ready_df) - -# COMMAND ---------- - -options = { - "kafka.ssl.endpoint.identification.algorithm": "https", - "kafka.sasl.jaas.config": sasl_config, - "kafka.sasl.mechanism": sasl_mechanism, - "kafka.security.protocol" : security_protocol, - "kafka.bootstrap.servers": kafka_bootstrap_servers, - "group.id": 1, - "subscribe": topic, - "topic": topic, - "checkpointLocation": checkpoint_path -} - -#Write to Kafka -( - kafka_ready_df - .write - .format("kafka") - .options(**options) - .save() -) diff --git a/util/notebook-config.py b/util/notebook-config.py deleted file mode 100644 index c37d4ea..0000000 --- a/util/notebook-config.py +++ /dev/null @@ -1,33 +0,0 @@ -# Databricks notebook source -# DBTITLE 1,Kafka config - see the RUNME notebook for instructions on setting up secrets -kafka_bootstrap_servers = dbutils.secrets.get("solution-accelerator-cicd", "iot-anomaly-kafka-bootstrap-server") -security_protocol = "SASL_SSL" -sasl_mechanism = "PLAIN" -sasl_username = dbutils.secrets.get("solution-accelerator-cicd", "iot-anomaly-sasl-username") -sasl_password = dbutils.secrets.get("solution-accelerator-cicd", "iot-anomaly-sasl-password") -topic = "iot_msg_topic" -sasl_config = f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";' - -# COMMAND ---------- - -# DBTITLE 1,Streaming checkpoint location -checkpoint_path = "/dbfs/tmp/iot-anomaly-detection/checkpoints" - -# COMMAND ---------- - -# DBTITLE 1,Database settings -database = "rvp_iot_sa" - -spark.sql(f"create database if not exists {database}") - -# COMMAND ---------- - -# DBTITLE 1,mlflow settings -import mlflow -model_name = "iot_anomaly_detection_xgboost" -username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get() -mlflow.set_experiment('/Users/{}/iot_anomaly_detection'.format(username)) - -# COMMAND ---------- - -