Building an LLMOPs Pipeline

Author:Murphy  |  View: 21247  |  Time: 2025-03-22 23:13:49
Image from Unsplash by Sigmund

2023 was the year that witnessed the rise of various Large Language Models (LLMs) in the Generative AI space. LLMs have incredible power and potential, but productionizing them has been a consistent challenge for users. An especially prevalent problem is what LLM should one use? Even more specifically, how can one evaluate an LLM for accuracy? This is especially challenging when there's a large number of models to choose from, different datasets for fine-tuning/RAG, and a variety of prompt engineering/tuning techniques to consider.

To solve this problem we need to establish DevOps best practices for LLMs. Having a workflow or pipeline that can help one evaluate different models, datasets, and prompts. This field is starting to get known as LLMOPs/FMOPs. Some of the parameters that can be considered in LLMOPs are shown below, in a (extremely) simplified flow:

LLM Evaluation Consideration (By Author)

In this article, we'll try to tackle this problem by building a pipeline that fine-tunes, deploys, and evaluates a Llama 7B model. You can also scale this example, by using it as a template to compare multiple LLMs, datasets, and prompts. For this example, we'll be utilizing the following tools to build this pipeline:

  • SageMaker JumpStart: SageMaker JumpStart provides various FM/LLMs out of the box for both fine-tuning and deployment. Both these processes can be quite complicated, so JumpStart abstracts out the specifics and enables you to specify your dataset and model metadata to conduct fine-tuning and deployment. In this case we select Llama 7B and conduct Instruction fine-tuning which is supported out of the box. For a deeper introduction into JumpStart fine-tuning please refer to this blog and this Llama code sample, which we'll use as a reference.
  • SageMaker Clarify/FMEval: SageMaker Clarify provides a Foundation Model Evaluation tool via the SageMaker Studio UI and the open-source Python FMEVal library. The feature comes built-in with a variety of different algorithms spanning different NLP domains such as Text Generation and Summarization. In this example we utilize the library to evaluate the Llama model for a summarization use-case. To get a deeper introduction into the library refer to my starter article here.
  • SageMaker Pipelines Step Decorator: SageMaker Pipelines is an MLOPs feature within SageMaker that helps you operationalize ML workflows. With Pipelines you can define different steps and parameters to build your ML workflow. Within Pipelines specifically, there‘s a feature known as the step decorator where you can lift and shift Python code into functions that you can chain together as a Pipeline. In this example, we'll use this feature to build functions for training and evaluation using the tools defined above. To get an introduction into the Step Decorator refer to my starter article here.

Now that we understand our stack, let's get hands on!

NOTE: This article assumes a basic understanding of Python, LLMs, and Amazon SageMaker.

DISCLAIMER: I am a Machine Learning Architect at AWS and my opinions are my own.

Table of Contents

  1. Setup & Dataset Preparation
  2. Pipeline Step Building
  3. Pipeline Execution
  4. Additional Resources & Conclusion

1. Setup & Dataset Preparation

For development, we'll be working in the new SageMaker Studio environment (local kernel support enabled). We'll be utilizing a ml.c5.18xlarge instance with a Python3 kernel.

For our use-case we'll be fine-tuning and evaluating a summarization use-case with Llama. For our dataset, we'll use the public Dolly Dataset (License: cc-by-sa-3.0). We can pull this dataset using the built-in HuggingFace datasets library and filter it for the summarization data points. We also create a train and test dataset for both fine-tuning and evaluation.

import datasets

# dolly dataset
dolly_dataset = load_dataset("databricks/databricks-dolly-15k", split="train")

# summarization use-case
summarization_dataset = dolly_dataset.filter(lambda example: example["category"] == "summarization")
summarization_dataset = summarization_dataset.remove_columns("category")

# train test split
train_and_test_dataset = summarization_dataset.train_test_split(test_size=0.1)

# local train dataset
train_and_test_dataset["train"].to_json("train.jsonl")

# test dataset
train_and_test_dataset["test"].to_json("test.jsonl")

We also specify the model metadata to JumpStart to retreive the proper Llama 7B model.

import sagemaker

model_id, model_version = "meta-textgeneration-llama-2-7b", "2.*"

For training we'll be implementing Instruction fine-tuning so we prepare a template for the prompt and response, or in this case the text and the summary. We'll be using the following example as a reference for the training portion.

import json

template = {
    "prompt": "Below is an instruction that describes a task, paired with an input that provides further context. "
    "Write a response that appropriately completes the request.nn"
    "### Instruction:n{instruction}nn### Input:n{context}nn",
    "completion": " {response}",
}
with open("template.json", "w") as f:
    json.dump(template, f)

We then upload these files to a common S3 path for training and also inference/evaluation later.

from sagemaker.s3 import S3Uploader
import sagemaker
import random

output_bucket = sagemaker.Session().default_bucket()
local_data_file = "train.jsonl"
test_data_file = "test.jsonl"
train_data_location = f"s3://{output_bucket}/dolly_dataset"
test_data_location = f"s3://{output_bucket}/test_dataset"
S3Uploader.upload(local_data_file, train_data_location)
S3Uploader.upload("template.json", train_data_location)
S3Uploader.upload(test_data_file, test_data_location)
print(f"Training data: {train_data_location}")
print(f"Test data: {test_data_location}")
print(f"Output bucket: {output_bucket}")

2. Pipeline Step Building

Pipeline Setup

To set up our Pipeline we need a few individual files that define our execution environment. One is a config.yaml file which will define the hardware for the Pipeline execution as well as any other configurations that you define. The config file also points towards your requirements.txt which will get installed in the Pipeline environment.

SchemaVersion: '1.0'
SageMaker:
  PythonSDK:
    Modules:
      RemoteFunction:
        InstanceType: ml.m5.xlarge
        Dependencies: ./requirements.txt
        IncludeLocalWorkDir: true
        CustomFileFilter:
          IgnoreNamePatterns: # files or directories to ignore
          - "*.ipynb" # all notebook files
jsonlines
sagemaker
fmeval

We can then point towards this config file as an environment variable:

import os

# Set path to config file
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()

Optionally with Pipelines you can also define parameters that will get injected into your Pipeline. In this case we define the hardware instance type for the job that conducts each step:

import sagemaker
from sagemaker.workflow.function_step import step
from sagemaker.workflow.parameters import ParameterString

sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name

instance_type = ParameterString(name="TrainInstanceType", 
default_value="ml.c5.18xlarge") 

Training & Deploy Step

For the training step we will be working with SageMaker JumpStart to fine-tune our Llama 7B model. We pass in a few different parameters to the function:

  • Training Data Path: This is the S3 location that will point towards the train.jsonl and template.json file.
  • Model Metadata: JumpStart recognizes what model to pulldown depending on the Model ID and Version that you pass in. In this case we specify the Llama 7B model.

Once we have defined these we setup the JumpStart Estimator with these parameters. Optionally you can also define model specific parameters for fine-tuning as well, depending on the knobs that you want to test for performance. Note the step decorator with the function, symbolizes that we are dealing with a SageMaker Pipeline step rather than a vanilla Python function.

# step one
@step(
    name = "train-deploy",
    instance_type = instance_type,
    keep_alive_period_in_seconds=300
)
def train_deploy(train_data_path: str, 
model_id: str = "meta-textgeneration-llama-2-7b", 
model_version: str = "2.*") -> str:
    import sagemaker
    from sagemaker.jumpstart.estimator import JumpStartEstimator

    # configure JumpStart Estimator
    estimator = JumpStartEstimator(
        model_id=model_id,
        model_version=model_version,
        environment={"accept_eula": "true"},
        disable_output_compression=True, 
    )
    estimator.set_hyperparameters(instruction_tuned="True", epoch="1", max_input_length="1024")
    estimator.fit({"training": train_data_path})

    ## deploy fine-tuned model
    finetuned_predictor = estimator.deploy()
    endpoint_name = finetuned_predictor.endpoint_name
    return endpoint_name

Post training we deploy the fine-tuned Llama model to a SageMaker Real-Time Endpoint, which we can invoke for inference. By default this instance type is selected for both training and inference, depending on the Llm you have chosen, but you can adjust this if you would like to select the hardware yourself.

We return the endpoint name as an input for our next step, so that we can conduct inference and evaluation with that endpoint. When you do end up executing the Pipeline, for this step you will see a successful JumpStart Training Job and Endpoint created in the Studio UI.

JumpStart Training Job (Screenshot by Author)
Endpoint Created (Screenshot by Author)

Now that we have our endpoint we can move onto running sample inference and conducting evaluation on those results.

Inference & Evaluation Step

For our second step we once again define certain parameters for our function:

  • Endpoint Name: We perform inference against this endpoint prior to evaluation.
  • S3_Test_Path: We had earlier also pushed a test dataset separate from our training dataset to S3. We'll pull the dataset down from this path and run inference prior to evaluation.

We first download the S3 "test.jsonl" file utilizing the Boto3 Python SDK:

def evaluate(endpoint_name: str, output_bucket: str = output_bucket, test_data_file: str = "test.jsonl",
            key_path: str = "test_dataset/test.jsonl") -> str:

    # download S3 file
    s3 = boto3.client("s3")
    s3.download_file(output_bucket, key_path, test_data_file)

We then take this file and run inference across this and create a new JSONLines file for which we can run our evaluation algorithms across. We limit this sample to just 20 data points for the purpose of time, but you can increase the test dataset as needed.

with jsonlines.open(input_file) as input_fh, jsonlines.open(output_file, "w") as output_fh:
  for i, datapoint in enumerate(input_fh, start=1):
      instruction = datapoint["instruction"]
      context = datapoint["context"]
      summary = datapoint["response"]
      payload = prepare_payload(datapoint)
      response = runtime.invoke_endpoint(EndpointName=endpoint_name, Body=json.dumps(payload), 
                             ContentType=content_type, CustomAttributes='accept_eula=true')
      result = json.loads(response['Body'].read().decode())[0]['generation']
      line = {"instruction": instruction, "context": context, "summary": summary, "model_output": result}
      output_fh.write(line)

      # evaluate just 20 datapoints for example
      if i == 20:
          break

Once this part of the function has run it should generate a "results.jsonl" file, this file will contain a few different parameters:

  • Document/Input: This is the original text that needs to be summarized.
  • Ground Truth/Actual Output: This is the summary that was in the test dataset, this is the ground truth we are evaluating against.
  • Model Output: This is the inference we conducted with our fine-tuned Llama 7B model. We will run our evaluation algorithm on the Ground Truth values vs the Model Inference results.

Now that we have our dataset for evaluation, we can import the FMEval library:

import fmeval
from fmeval.data_loaders.data_config import DataConfig
from fmeval.constants import MIME_TYPE_JSONLINES
from fmeval.eval_algorithms.summarization_accuracy import SummarizationAccuracy

Note that we pull down the SummarizationAccuracy Algorithm, which will return metrics such as Meteor, Rouge, and Bert. To see the full implementation of these algorithms, you can refer to the open-source code at this link. In general each of these metrics has its own pros and cons and you can select which metric you want to use for evaluation.

  • Rouge: Rouge-N Scores which essentially searches for N-gram word overlap between the ground truth and model inference summary.
  • Meteor: Builds off of Rouge to incorproate stemming and synonyms, this will capture more similarities in texts in the case the words don't exactly match.
  • BertScore: Matches words using cosine similarity while utilizing pre-trained embeddings of BERT, you'll see this downloaded upon installation of the package.

For a deeper dive of each of these metrics I would reference the following article.

For our implementation using the FMEval package, we first configure our dataset in a FMEval specific DataConfig object and specify the ground truth and model inference columns.

config = DataConfig(
        dataset_name="dolly_summary_model_outputs",
        dataset_uri="results.jsonl",
        dataset_mime_type=MIME_TYPE_JSONLINES,
        model_input_location="instruction",
        target_output_location="summary",
        model_output_location="model_output"
    )

We then instantiate the SummarizationAccuracy algorithm and run an evaluation with our DataConfig object specified.

eval_algo = SummarizationAccuracy()
eval_output = eval_algo.evaluate(dataset_config=config, save=True)
res = json.dumps(eval_output, default=vars, indent=4)
serialized_data = json.loads(res)
# print metrics to CW logs, realistically push to somewhere to visualize
for item in serialized_data:
    for key, value in item.items():
        print(f"Key: {key}, Value: {value}")

In this case we write the metrics to CloudWatch logs directly, in a realistic use-case you can dump this to S3 or a visualization tool such as QuickSight for a prettier view of your evaluation. If you check the CloudWatch logs for the second step post Pipeline execution, you will notice the metrics have been outputted.

Evaluation Metrics (Screenshot by Author)

3. Pipeline Execution

Once the two Pipeline Steps have been defined, you can simply chain them together as you would with vanilla Python functions.

# stitch together pipeline
from sagemaker.workflow.pipeline import Pipeline

endpoint_name = train_deploy(train_data_location)
eval_metrics = evaluate(endpoint_name)

We can then define a Pipeline object and kick off an execution:

pipeline = Pipeline(
    name="llm-train-eval-pipeline",
    parameters=[
        instance_type
    ],
    steps=[
        eval_metrics,
    ],
)

# execute Pipeline
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.describe()
execution.wait()

You can view and monitor the Pipeline execution in the Studio UI, this Pipeline will take about 45 minutes to successfully complete.

Execution Status (Screenshot by Author)
DAG (Screenshot by Author)

4. Additional Resources & Conclusion

GenAI-Samples/LLMOps-Pipeline at master · RamVegiraju/GenAI-Samples

I hope this article was a useful introduction into Llmops and building a Pipeline utilizing different SageMaker features. As LLM use-cases expand so do the needs for proper experimentation to identify the ideal configuration for your LLM. This example can be expanded to incorporate multiple LLMs, datasets, prompt templates, and more. Stay tuned for more content in the GenAI/LLM space.

As always thank you for reading and feel free to leave any feedback.


If you enjoyed this article feel free to connect with me on LinkedIn and subscribe to my Medium Newsletter.

Tags: AWS Llm Llmops Mlops Sagemaker

Comment