亚马逊AWS官方博客

使用 SageMaker InferenceComponent 和 LiteLLM 构建自己的 MaaS 平台

背景简介

在当今大模型快速发展的 GenAI 时代,企业内部对于模型即服务(MaaS)的需求日益增长。很多公司希望建立一个自主可控的 MaaS 中台,通过统一兼容 OpenAI 的 HTTP 接口,为业务部门提供多种模型的访问。

多样化的模型选择

客户希望在 MaaS 中台上集成多种模型,既包括市场上成熟的 SaaS 化模型,如 HAQM Nova、Claude Sonnet 3.7 等,也包括自主部署的 DeepSeek-R1 蒸馏版模型。这种多样化的模型选择,能够满足不同业务场景下的需求,为企业带来更大的灵活性和创新空间。

高性能硬件支持

为了确保模型的高效运行,有的客户配备了A100 8卡高端服务器。这些服务器用于部署基于开源自建 DeepSeek-R1 蒸馏版本模型的不同规格,比如 32B 和 1.5B,能够满足从高精度到快速响应的各种需求。

提升资源利用率

客户希望通过这个 MaaS 中台,最大限度地提升推理服务器的资源利用率,以应对企业内部不同规模的流量需求。为了实现这一目标,需要对后端部署的不同版本 R1 蒸馏模型提供细粒度模型副本级别的弹性扩缩功能。这意味着系统能够根据实际需求,动态调整模型的副本数量,从而在保证性能的同时,有效降低资源浪费。

整体架构

根据以上各类需求,我们推荐采用 HAQM SageMaker Inference Component 结合开源 LiteLLM 的部署方案,具体如下:

主要组件

  • OpenAPI RESTful HTTP:这是客户端与 MaaS 中台之间的接口,通过 OpenAPI 标准,确保了接口的通用性和兼容性。客户可以通过这个接口发送请求,获取模型的响应。
  • Max token & Model temperature:OpenAI 兼容的参数用于控制模型的输出。对后端不同的模型屏蔽 api 的差异性。
  • LiteLLM Proxy:作为中间层,LiteLLM Proxy 负责处理来自客户端的请求,并将其转发到相应的模型服务。它支持多种模型,包括 Nova Models、Claude Models 和自建的 DeepSeek Models。
  • EKS:HAQM Elastic Kubernetes Service(EKS)是我们用来管理和部署容器化应用的平台。它提供了高可用性和可扩展性,确保了模型服务的稳定运行。
  • SageMaker Inference Component:这是我们用来部署和管理模型的核心组件。它支持自建托管多种 LLM 的推理服务,包括 DeepSeek 的不同版本(如 32B 和 5B),以及更多的长尾的开源模型(如 HuggingFace)。
  • ECR:HAQM Elastic Container Registry(ECR)是我们用来存储和管理容器镜像的服务。它确保了模型部署的快速和可靠。
  • SageMaker LMI Image:这是我们用来构建和部署模型的基础镜像。它包含了所有必要的依赖和配置,确保了模型的高效运行,且提供了业界领先的推理框架,开箱即用的方式进行部署,而不需要客户自己再进行构建和配置。

工作流程

  1. 请求发送:客户通过 OpenAPI RESTful HTTP 接口发送请求,包含 Max token 和 Model temperature 参数。
  2. 请求处理:LiteLLM Proxy 接收请求,并根据请求内容将其转发到相应的模型服务。
  3. 模型响应:模型服务(如 Bedrock 或 DeepSeek)处理请求,并生成响应。
  4. 响应返回:LiteLLM Proxy 将模型的响应返回给客户端。

以下章节我们详细描述该方案的主要组件的部署步骤和方法。

SageMaker Inference Component 部署

SageMaker Inference Component 是一项新推出的功能,建立在 SageMaker 实时推理端点(SageMaker Endpoint)功能的基础上,在使用定义端点的实例类型和初始实例计数的端点配置来创建 SageMaker 推理服务后,在每个 InferenceComponent 组件中,可以指定要分配给模型每个副本的加速芯片数量/CPU 数量/显存量等,以及要部署的 LLM 模型、推理的容器映像和模型副本数量。从而允许客户更精细地控制多个模型的部署和资源分配。

SageMaker Inference Component 具体架构如下图所示:

详细信息可以参阅附录章节“SageMaker Inference Component 简介”,这里我们主要看下使用 SageMaker Inference Component(下文中简称 SageMaker IC)来部署多个 LLM 模型的方法。

创建 SageMaker Endpoint

首先,您需要创建一个 SageMaker 端点配置。这个配置包含了实例类型和初始实例计数,代码示例如下:

import boto3
import sagemaker

role = sagemaker.get_execution_role()
sm_client = boto3.client(service_name="sagemaker")
endpoint_config_name="Sagemaker-inference-componet3"
endpoint_name = "Sagemaker-inference-componet-mme3"

!aws sagemaker delete-endpoint-config --endpoint-config-name Sagemaker-inference-componet3


sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ExecutionRoleArn=role,
    ProductionVariants=[{
        "VariantName": "AllTraffic",
        "InstanceType": "ml.g5.48xlarge",
        "InitialInstanceCount": 1,
        "RoutingConfig": {
            "RoutingStrategy": "LEAST_OUTSTANDING_REQUESTS"
        },
    }]
)

sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name,
)

配置 SageMaker IC 推理组件

SageMaker IC 推理组件允许客户指定要分配给不同的模型每个副本的加速器数量和内存量,以及模型对象、容器映像和要部署的模型副本数量,从而方便的实施多个模型的资源隔离和分配。

以一个场景为例,客户要同时部署 DeepSeek-R1-Distill-Qwen-32B 和 DeepSeek-R1-Distill-Qwen-1.5B 两种模型,在一个 8卡(Nvidia A10)的 G5 服务器上(ml.g5.48xlarge),我们可以把 32B 的模型部署在 4 张 GPU 卡上并行推理,副本为 1;1.5B 的模型部署在 2 张 GPU 卡上,副本为 2,具体如下。

首先我们需要创建一个 SageMaker Endpoint 推理端点,作为 SageMaker 不同 IC 的承载服务器:

import sagemaker
from sagemaker import Model, image_uris, serializers, deserializers
from sagemaker import get_execution_role
from sagemaker.pytorch import PyTorchModel
import time

hf_model_id="deepseek-ai/DeepSeek-R1-Distill-Qwen-32B"
# 获取 lim 推理容器
image_uri = "763104351884.dkr.ecr.us-west-2.amazonaws.com/djl-inference:0.31.0-lmi13.0.0-cu124"
print(f"Image going to be used is ---- > {image_uri}")


model_name= "deepseek-r1-distill-qwen-32b-"+time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
endpoint_name = "Sagemaker-inference-componet-mme3"

其次我们使用 SageMaker LMI vllm 推理镜像,因此需要准备 vllm 推理镜像的配置文件(server.properties)以及相关的 vllm 版本依赖(requiremnts.txt)。

serving.properties 配置如下所示:

engine=Python
option.trust_remote_code=True
option.tensor_parallel_degree=4
option.gpu_memory_utilization=.87
option.max_model_len=10240
option.model_id=deepseek-ai/DeepSeek-R1-Distill-Qwen-32B
option.max_rolling_batch_size=2
option.rolling_batch=vllm

requirements.txt 文件如下所示(这里指定 vllm 为 0.7.0 版本):

vllm==0.7.0

把 server.properties 和 requiremnts.txt 配置文件打包并上传至 s3 路径:

mkdir mymodel
mv serving.properties mymodel/
mv requirements.txt mymodel/
tar czvf mymodel.tar.gz mymodel/
rm -rf mymodel

s3_code_prefix = "large-model-lmi/code"
bucket = sess.default_bucket()  # bucket to host artifacts
code_artifact = sess.upload_data("mymodel.tar.gz", bucket, s3_code_prefix)

至此 SageMaker LMI vllm 镜像所需配置已经准备好,可以开始创建 model 和 SageMaker Inference Component 了,具体如下:

  • DeepSeek-R1-Distill-Qwen-32B 模型部署:
    container_config = {
        'Image': image_uri,
        'ModelDataUrl': source_data,
        'Environment': vllm_config
    }
    
    response = sm_client.create_model(
        ModelName=model_name,
        ExecutionRoleArn=role,
        PrimaryContainer=container_config
    )
    print(f"Model created: {response['ModelArn']}")
    
    sm_client.create_inference_component(
        InferenceComponentName="IC-deepseek-r1-distill-qwen-32b-"+time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime()),
        EndpointName=endpoint_name,
        VariantName="AllTraffic",
        Specification={
            "ModelName": model_name,
            "ComputeResourceRequirements": {
                "NumberOfAcceleratorDevicesRequired": 4,
                "MinMemoryRequiredInMb": 80096
            }
        },
        RuntimeConfig={"CopyCount": 1},
    )
    
  • DeepSeek-R1-Distill-Qwen-1.5B 模型部署:
    import sagemaker
    from sagemaker import Model, image_uris, serializers, deserializers
    from sagemaker import get_execution_role
    from sagemaker.pytorch import PyTorchModel
    import time
    
    hf_model_id="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"
    # 获取 lim 推理容器
    image_uri = "763104351884.dkr.ecr.us-west-2.amazonaws.com/djl-inference:0.31.0-lmi13.0.0-cu124"
    print(f"Image going to be used is ---- > {image_uri}")
    
    
    model_name= "deepseek-r1-distill-qwen-1-5b-"+time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
    endpoint_name = "Sagemaker-inference-componet-mme3"
    ### lmi vllm配置方法同上一章节32B model一样,这里不再赘述
    
    container_config = {
        'Image': image_uri,
        'ModelDataUrl': source_data,
        'Environment': vllm_config
    }
    
    response = sm_client.create_model(
        ModelName=model_name,
        ExecutionRoleArn=role,
        PrimaryContainer=container_config
    )
    
    print(f"Model created: {response['ModelArn']}")
    
    sm_client.create_inference_component(
        InferenceComponentName="IC-deepseek-r1-distill-qwen-1-5b-"+time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime()),
        EndpointName=endpoint_name,
        VariantName="AllTraffic",
        Specification={
            "ModelName": model_name,
            "ComputeResourceRequirements": {
                "NumberOfAcceleratorDevicesRequired": 1,
                "MinMemoryRequiredInMb": 4096
            }
        },
        RuntimeConfig={"CopyCount": 2},
    )
    

说明:

  • 32B 的模型 FP16 精度需要 64G 以上显存,在 A10 24G 显存的卡上,我们配置了 4 张卡,因此设置 tensor 张量并行为 4( vllm 不支持奇数,如 3 或者 5,transformers layer 无法均分到每个 GPU 上)。
  • 由于 vllm 要预填充 prefill tokens,Inference Component 的 MinGPU Memory 需要设置大一点(32B 需要 64G 以上,目前通过 MinMemoryRequiredInMb 设置为 80G)。
  • 5B 部署 2 个副本,设置 copy 参数为 2,这样会在 2 个 GPU 上部署 1.5B 的模型的两个推理实例,SageMaker IC 自动做负载均衡。
  • 这里采用了 SageMaker 的 vllm LMI 推理镜像,vllm 是 rolling batch 的动态批处理,吞吐量随着并发的增长有很好的扩展性,详细 vllm 介绍可以参考附录“vllm 推理框架”。

这里有一些 SageMaker IC 配置 vllm 推理引擎框架上需要注意的地方:

  • copies 副本资源分配,目前不支持单个 GPU 再划分 partition。
  • minGPU 需要适量加大,因为 vllm 默认会;prefill cache,如果 GPU 显存不足,适量减少 queue size(max rolling batch size)和 max model len。
  • SageMaker IC,它的 GPU,显存分配是针对单个 copy 的,48x,8 卡要部署 2 个 copy 的话,每个 copy 配置 4 卡就够了,IC 自己会扩 2 个 4 卡,就是 8 卡。
  • 多个 instance 配置扩缩,可以指定 copy 副本级别请求调用量的 metrics(注意,最终扩缩的还是 instance 实例,也就是 copies 增大,instance 也需要对应的增大,否则没有足够的 GPU 卡资源,同样会报错,扩缩失败),如下所示:
    resource_id = f"inference-component/{inference_component_name}"
    service_namespace = "sagemaker"
    scalable_dimension = "sagemaker:inference-component:DesiredCopyCount"
    min_copy_count = 0
    max_copy_count = 8
    aas_client.register_scalable_target(
    ServiceNamespace=service_namespace,
    ResourceId=resource_id,
    ScalableDimension=scalable_dimension,
    MinCapacity=min_copy_count,
    MaxCapacity=max_copy_count,
    )
    
  • 多 copies 副本 update/扩缩/initial 时候,可以通过 SageMaker IC 的 describe API 看到详细日志信息,如下所示:
    endpoint_name = "Sagemaker-inference-componet-mme3"
    sm_client.describe_inference_component(
        #InferenceComponentName='IC-deepseek-r1-distill-qwen-32b-2025-02-25-14-27-09'
        InferenceComponentName='IC-deepseek-r1-distill-qwen-32b-2025-02-25-15-43-12'
    )
    

InferenceComponent 管理

SageMaker IC 提供了多种 API 和接口,可以方便地对已经部署的 Component 模型更新,删除和新增,如下所示:

  • 查询 SageMaker IC
    endpoint_name = "Sagemaker-inference-componet-mme3"
    sm_client.list_inference_components(
        EndpointNameEquals=endpoint_name
    )
    
    endpoint_name = "Sagemaker-inference-componet-mme3"
    sm_client.describe_inference_component(
        #InferenceComponentName='IC-deepseek-r1-distill-qwen-32b-2025-02-25-14-27-09'
        InferenceComponentName='IC-deepseek-r1-distill-qwen-32b-2025-02-28-05-59-27'
    )
    
  • 删除 SageMaker IC
    ##delete all IC componet
    endpoint_name = "Sagemaker-inference-componet-mme3"
    InferenceComponents = sm_client.list_inference_components(
        EndpointNameEquals=endpoint_name
    )['InferenceComponents']
    for InferenceComponent in InferenceComponents:
        IC_name = InferenceComponent['InferenceComponentName']
        print(IC_name)
        sm_client.delete_inference_component(InferenceComponentName=IC_name)
    
  • 更新 SageMaker IC
    response = sm_client.update_inference_component(
        InferenceComponentName='IC-deepseek-r1-distill-qwen-1.5b-2025-02-25-15-43-12',
        Specification={
            "ModelName": "deepseek-r1-distill-qwen-1.5b-2025-02-25-15-43-11",
            'ComputeResourceRequirements': {
                'NumberOfAcceleratorDevicesRequired': 1,
                'MinMemoryRequiredInMb': 4096
            }
        },
        RuntimeConfig={
            'CopyCount': 8
        }
    )
    

LiteLLM

接下来我们看一下另一个重要的组件,通过 LiteLLM proxy 做 MaaS 的代理分发。

LiteLLM 是一个高效、灵活的工具,旨在简化与多个机器学习模型提供商的集成。它提供了一致的接口,使得开发者能够更容易地与不同的模型进行交互,无需关心底层的复杂实现。LiteLLM 特别适用于需要快速部署和管理多个模型的场景,能够显著提升开发效率和系统的可维护性。详见附录“LiteLLM Proxy开源代理框架”

在客户自建的 MaaS 中台中,LiteLLM proxy 扮演着至关重要的角色。以下是其主要功能和优势:

  • 多 Provider 集成:
    • 客户的 MaaS 中台需要接入各种模型,包括自建在 HAQM SageMaker Endpoint 的 LLM 模型、HAQM Bedrock、第三方 LLM SaaS 模型。LiteLLM proxy 能够无缝集成这些不同来源的模型,提供统一的访问接口。
  • 统一 API 接入:
    • 为了简化外部业务的集成,客户需要一个统一的 API 接口。LiteLLM proxy 提供了兼容 OpenAI 的 RESTful API,使得业务部门可以通过一个一致的接口与所有模型进行交互,极大地降低了集成成本和复杂度。
  • Token 分发与账单分拆:
    • 在后期,系统需要实现 token 的分发和中台账单的分拆计费。LiteLLM proxy 支持这些高级功能,确保每个请求都能正确地进行 token 管理和计费,从而实现精细化的成本控制和透明的账单分摊。

下面我们详细介绍下 LiteLLM 的安装部署和配置。

LiteLLM proxy 安装

首先,你需要安装 LiteLLM Proxy。新版本需要显式安装 proxy 组件:

pip install 'litellm[proxy]'

在使用 LiteLLM Proxy 之前,需要设置一些环境变量以确保正确的配置和日志记录:

export LITELLM_LOG=DEBUG

假设你要连接的 SageMaker Endpoint 名称为 Sagemaker-inference-componet-mme3,你还需要设置 AWS 的访问密钥(如果使用 HAQM EC2 实例的实例身份角色 则不需要这步):

export AWS_ACCESS_KEY_ID=your_aws_access_key_idexport AWS_REGION_NAME=us-west-2export AWS_SECRET_ACCESS_KEY=your_aws_secret_access_key

使用以下命令启动 LiteLLM Proxy,并指定要使用的 SageMaker 模型:

endpoint_name="Sagemaker-inference-componet-mme3"
nohup litellm --model sagemaker/${endpoint_name} &

你也可以通过 YAML 配置文件来设置 SageMaker Inference Component 的相关参数。以下是一个示例配置:

model_list:
  - model_name: sagemaker_ds
    litellm_params:
      model: sagemaker/sagemaker/Sagemaker-inference-componet-mme3
      aws_region_name: ap-northeast-1
      model_id: IC-deepseek-r1-distill-qwen-32b-2025-03-11-14-30-02
      hf_model_name: deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B
      max_tokens: 1024

将上述配置保存为 config.yaml,然后使用以下命令启动 LiteLLM Proxy:

export LITELLM_LOG=DEBUG  nohup litellm --model sagemaker_ds --config ./config.yaml &

测试 LiteLLM Proxy

  • LiteLLM proxy 支持以 OpenAI 格式的 RESTful 请求调用后端的 SageMaker Endpoint,如下所示:
    InferenceComponentName="IC-deepseek-r1-distill-qwen-32b-2025-03-11-14-30-02"
    endpoint_name="Sagemaker-inference-componet-mme3"
    
    curl http://ec2-35-93-77-218.us-west-2.compute.amazonaws.com:4000/chat/completions \
        -H "Content-Type: application/json" \
        -d '{
                "model": "sagemaker/Sagemaker-inference-componet-mme3",
                "model_id": "IC-deepseek-r1-distill-qwen-32b-2025-03-11-14-30-02",
                "messages": [
                    {
                        "role": "system",
                        "content": "You are a helpful assistant."
                    },
                    {
                        "role": "user",
                        "content": "Hello!"
                    }
                ],
            }'
    
  • 同样我们可以用 OpenAI SDK 做兼容性测试:
    InferenceComponentName='IC-deepseek-r1-distill-qwen-32b-2025-02-28-05-59-27'
    endpoint_name="Sagemaker-inference-componet-mme3"
    
    import os 
    from litellm import completion
    import boto3
    
    
    recipe_food = """
    How to make cake?
    """
    
    import openai # openai v1.0.0+
    client = openai.OpenAI(api_key="dummy",
                           base_url="http://ec2-35-93-77-218.us-west-2.compute.amazonaws.com:4000") # set proxy to base_url
    # request sent to model set on litellm proxy, `litellm --model`
    response = client.chat.completions.create(
        model="deepseek-r1-qwen-32b",
        model_id="IC-deepseek-r1-distill-qwen-32b-2025-02-28-05-59-27", #如果在config.yaml 已配置model_id 这行可以不用加
        messages = [
        {
            "role": "user","content":prompt_template
        }],
        #stream=True,
        )
    
    print(response)
    
  • 当然也可以使用 LiteLLM completion 的 SDK,通过 model_id 传入 SageMaker Inference Component name
    InferenceComponentName='IC-deepseek-r1-distill-qwen-32b-2025-03-11-14-30-02'
    endpoint_name="Sagemaker-inference-componet-mme3"
    
    import os 
    from litellm import completion
    import boto3
    
    recipe_food = """
    How to make cake?
    """
    
    
    ## use completion sdk to pass component name via model id
    response = completion(
                model=f"sagemaker/{endpoint_name}", 
                model_id=InferenceComponentName,
                messages=[{ "content": prompt_template,"role": "user"}],
                temperature=0.2,
                max_tokens=80,
                #stream=True,
                aws_access_key_id="*****",
                aws_secret_access_key="*********",
                aws_region_name="us-west-2",
            )
    for part in response:
        print(part.choices[0].delta.content or "")
       
        
     ###result 
    >>> print(response['choices'][0]['message']['content'])
    Sure! Here's a classic vanilla cake recipe:
    
    **Ingredients:**
    - 1 ½ cups (300g) all-purpose flour
    - 1 cup (200g) granulated sugar
    - 1 ½ tsp baking powder
    - ½ tsp salt
    - 1 cup (240ml) whole milk
    - 2 large eggs
    -
    >>>
    

LiteLLM SageMaker 流式消息

SageMaker endpoint 的 LMI 镜像流式消息,需要在 vllm 打开 stream 选项

%%writefile serving.properties
engine=Python
option.trust_remote_code=True
option.tensor_parallel_degree=4
option.gpu_memory_utilization=.87
option.max_model_len=10240
option.model_id=deepseek-ai/DeepSeek-R1-Distill-Qwen-32B
option.max_rolling_batch_size=2
option.rolling_batch=vllm
option.enable_streaming=true
  • LiteLLM 代码中,因为 SageMaker 的 chunk encoder 编码方式,会在超过 chunk size 的时候截断,造成一个 token 单词分在不同的 chunk 中输出。
  • 因此在 LiteLLM 的 stream_handler中,对应 SageMaker LLM 的 CustomerStreamWapper 的相应代码,需要做相应的定制修改,具体如下:
    def __next__(self): 
       ....省略
    if (
                        isinstance(self.completion_stream, str)
                        or isinstance(self.completion_stream, bytes)
                        or isinstance(self.completion_stream, ModelResponse)
                    ):
                        chunk = self.completion_stream
                    ###增加判断逻辑,如果遇到sagemaker返回不是换行截断,则再取下一个chunk进行拼接
                    else:
                        chunk = next(self.completion_stream)
                    print("chunk ==",chunk)
                    if(not chunk.endswith('\n')):
                        next_chunk = next(self.completion_stream)
                        chunk += next_chunk
                    
    
  • 同时,在 Continue/Cline 等工具对接 LiteLLM 时,由于其调用的是 LiteLLM SageMaker handler 的异步的__anext__ 序列方法获取 SageMaker 的流式 chunk 输出,因此也需要处理 chunk encoder 编码截断的问题,具体如下:
    async for chunk in iterator:
                event_stream_buffer.add_data(chunk)
                for event in event_stream_buffer:
                    try:
                        message = self._parse_message_from_event(event)
                        if message:
                            verbose_logger.debug("sagemaker parsed chunk bytes %s", message)
                            # 移除 data: 前缀和 "\n\n" 结尾
                            message = (
                                litellm.CustomStreamWrapper._strip_sse_data_from_chunk(message)
                                or ""
                            )
                            message = message.replace("\n\n", "")
            
                            # 累积 JSON 数据
                            accumulated_json += message
            
                            # 尝试解析累积的 JSON
                            try:
                                _data = json.loads(accumulated_json)
                                if self.is_messages_api:
                                    yield self._chunk_parser_messages_api(chunk_data=_data)
                                else:
                                    yield self._chunk_parser(chunk_data=_data)
                                # 解析成功后重置累积的 JSON 数据
                                accumulated_json = ""
                            except json.JSONDecodeError:
                                # 如果还不是有效的 JSON,继续处理下一个事件
                                continue
                    except UnicodeDecodeError as e:
                        verbose_logger.warning(f"UnicodeDecodeError: {e}. Attempting to combine with next event.")
                        accumulated_json += "" # 跳过解析失败的event(sagemaker chunk encoder编码的截断符号)
                    except Exception as e:
                        verbose_logger.error(f"Error parsing message: {e}. Attempting to combine with next event.")
                        accumulated_json += "" # 跳过解析失败的event(sagemaker chunk encoder编码的截断符号)
            
            # 处理最后一个累积的 JSON 数据(如果有)
            if accumulated_json:
                try:
                    _data = json.loads(accumulated_json)
                    if self.is_messages_api:
                        yield self._chunk_parser_messages_api(chunk_data=_data)
                    else:
                        yield self._chunk_parser(chunk_data=_data)
                except json.JSONDecodeError as e:
                    verbose_logger.error(f"Final JSONDecodeError: {e}")
                except Exception as e:
                    verbose_logger.error(f"Final error parsing accumulated JSON: {e}")
    

     

  • 修改完毕后,通过 pip 在源代码目录下重新编译安装
    cd litellm
    pip install .  
    
  • 此时 LiteLLM 调用时,OpenAI 兼容 RESTful 参数中,传 stream 参数为 true
    2.compute.amazonaws.com:4000/chat/completions \
        -H "Content-Type: application/json" \
        -d '{
                "model": "sagemaker/Sagemaker-inference-componet-mme3",
                "model_id": "IC-deepseek-r1-distill-qwen-32b-2025-03-11-14-30-02",
                "messages": [
                    {
                        "role": "system",
                        "content": "You are a helpful assistant."
                    },
                    {
                        "role": "user",
                        "content": "I want to make coffee"
                    }
                ],
                "stream":true
            }'
    

从输出可以看出,已经是流式的 chunk,每个 chunk 是标准的 json 格式:

data: {"id":"chatcmpl-18f510a9-8c46-4e02-ab14-93582d40e1bc","created":1741732757,"model":"Sagemaker-inference-componet-mme3","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":".","role":"assistant"}}]}

data: {"id":"chatcmpl-18f510a9-8c46-4e02-ab14-93582d40e1bc","created":1741732757,"model":"Sagemaker-inference-componet-mme3","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":" What"}}]}

data: {"id":"chatcmpl-18f510a9-8c46-4e02-ab14-93582d40e1bc","created":1741732757,"model":"Sagemaker-inference-componet-mme3","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":" do"}}]}

data: {"id":"chatcmpl-18f510a9-8c46-4e02-ab14-93582d40e1bc","created":1741732757,"model":"Sagemaker-inference-componet-mme3","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":" I"}}]}

data: {"id":"chatcmpl-18f510a9-8c46-4e02-ab14-93582d40e1bc","created":1741732757,"model":"Sagemaker-inference-componet-mme3","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":" need"}}]}

data: {"id":"chatcmpl-18f510a9-8c46-4e02-ab14-93582d40e1bc","created":1741732757,"model":"Sagemaker-inference-componet-mme3","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"?\n\n"}}]}

data: {"id":"chatcmpl-18f510a9-8c46-4e02-ab14-93582d40e1bc","created":1741732757,"model":"Sagemaker-inference-componet-mme3","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"</think>"}}]}

data: {"id":"chatcmpl-18f510a9-8c46-4e02-ab14-93582d40e1bc","created":1741732757,"model":"Sagemaker-inference-componet-mme3","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"\n\n"}}]}

总结

综上所述,通过 LiteLLM,SageMaker Inference Component 推理端点,构建一个统一的 MaaS 中台,客户不仅能够集成多种模型,还能充分利用高性能硬件,实现资源的高效管理。这种灵活且高效的架构,将为客户在人工智能领域的应用打下坚实的基础,助力其在竞争中脱颖而出。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您了解行业前沿技术和发展海外业务选择推介该服务。

附录

本篇作者

唐清原

亚马逊云科技高级解决方案架构师,负责 Data Analytic & AIML 产品服务架构设计以及解决方案。10+数据领域研发及架构设计经验,历任 IBM 咨询顾问,Oracle 高级咨询顾问,澳新银行数据部领域架构师职务。在大数据 BI,数据湖,推荐系统,MLOps 等平台项目有丰富实战经验。

黎裕坚

亚马逊云科技高级解决方案架构师,负责基于亚马逊云科技的云计算方案架构的咨询和设计,同时致力于亚马逊云科技 GenAI 类服务的应用和推广。