亚马逊AWS官方博客

基于生成式 AI 预防非法 Lambda 脚本执行的解决方案

前言

在 Lambda 的实际应用中,开发者通常会将本地代码打包成 tar 文件并上传至 Lambda 环境中执行。

然而,这种便捷的部署方式可能隐藏着潜在的安全隐患:恶意用户可能借此上传并运行非法脚本,对系统安全造成威胁。本文将详细介绍一个专门设计的代码审核解决方案,帮助您有效预防这类安全风险。

场景分析

目前,将代码上传至 HAQM Lambda 的标准流程可能存在两类主要安全风险:

  • 直接植入型风险:用户上传的代码本身包含恶意代码。
  • 间接下载型风险:代码在运行过程中,通过下载方式获取并执行恶意代码。

优化解决方案

为了解决这个问题,一种比较简单的思路是对这个流程本身做一些类审计的修改调整:

1. 预防性代码审计

在代码打包阶段:

  • 对源代码进行安全审计扫描
  • 发现异常行为立即终止流程并告警
  • 验证通过后才允许打包上传至Lambda

2. 运行时安全监控

在代码执行阶段:

  • 部署流量限制机制
  • 实时进行安全行为检测
  • 发现异常立即中断执行并向用户报警

这种“事前审计+实时监控”的双重防护机制,能有效预防和控制潜在的安全风险。

优化解决方案

实验组与对照组

安排如下三组代码进行测试,参考代码见:http://github.com/skerava/bedrockDemo

实验框架

在上传代码打包前,我们采用大模型进行初步的代码审核。具体流程如下:

代码预审核阶段

  1. 全面扫描代码脚本:利用大模型的 tool use 功能,对用户上传的所有代码文件进行读取与解析。
  2. 非法代码检测:大模型依据预定义的安全策略与规则,对代码中可能存在的非法行为进行判断。
  3. 合规性验证:只有通过审核的代码才会进入下一阶段,随后进行打包,并上传至 HAQM Lambda 执行环境。

运行时安全机制

在 Lambda 运行环境中,我们通过添加一层专用的 Lambda Layer —— Security Layer,实现对用户代码运行时的实时安全监控。该机制的核心设计如下:

  • 入口控制:Security Layer 的初始化入口被设定为 Lambda Function 的实际启动入口,从而确保整个执行流程均受控于安全层。
  • 流量与请求监控:在该层中,可对用户代码的外部流量进行限速控制;同时,通过对常用库(例如 requests)的 GET 请求进行 monkey patch,实现自定义的请求方法,以便在运行时嵌入额外的安全检测逻辑。
  • Layer 代码:详情见附录 F

动态安全检测

  • 文件下载与预执行扫描:在实验中,通过对用户代码进行 monkey patch,当用户下载文件后,在执行前先将文件内容提交给大模型进行安全扫描。
  • 异常处理:若大模型检测到代码中存在非法特征,则立即抛出错误,终止后续执行,确保系统安全。

实验结论与优势

  • 上传前检测:在代码打包上传前,即可对脚本进行全面的安全审查,大大降低恶意代码进入生产环境的风险。
  • 运行时透明安全防护:在 Lambda 运行时,通过额外的 Security Layer,在完全不修改用户原始代码的情况下,实现以下功能:
    • 对下载内容进行实时检测与审查;
    • 限速控制,防止异常流量;
    • 异常行为告警;
    • 利用已知非法特征库的 hashmap 进行快速匹配与拦截。

总的来说,该架构在代码上传前与运行时均实现了全方位的安全防护,为 AWS Lambda 环境提供了一种高效、透明且无缝的安全解决方案。任何安全相关的需求都可在此基础上进一步拓展和定制,从而满足各种复杂的安全策略需求。

附录

附录 A A.py 上传结果

```text
现在,我们来分析文件内容:

1. 文件包含一个简单的 Lambda 处理函数 `lambda_handler`。
2. 函数只是打印 "Hello World!" 并返回一个包含状态码和消息的字典。
3. 没有发现任何恶意代码模式。
4. 代码不包含任何网络攻击、恶意软件行为、未经授权的访问尝试、注入攻击、跨站脚本攻击、未经授权的抓取/爬取、隐私侵犯、凭证窃取或系统操纵的迹象。

安全分析结果:代码是安全的,可以进行上传。

现在,让我们使用 `file_packer_for_lambda` 函数来打包并上传文件,创建 Lambda 函数:
Executing tool: file_packer_for_lambda with input: {'path': '/Users/skerava/Documents/workshops/AIGC/basicClass/TMP/A.py', 'function_name': 'HelloWorldLambda'}...
Loading cached SSO token for skydev
Created new function: HelloWorldLambda
Returning the tool response(s) to the model...
The model's response:
太好了!Lambda 函数已经成功创建。以下是一些关键信息:

1. 函数名称:HelloWorldLambda
2. 函数 ARN:arn:aws:lambda:ap-southeast-1:xxxxxxxx:function:HelloWorldLambda
3. 运行时:Python 3.11
4. 内存大小:128 MB
5. 超时设置:30 秒
6. 处理程序:hello.lambda_handler

函数当前状态为 "Pending",这意味着它正在创建过程中。通常,这个过程只需要几秒钟就能完成。

您的 Lambda 函数已经成功上传并创建。它将打印 "Hello World!" 并返回一个包含状态码 200 和消息 "Hello World!" 的响应。如果您需要对函数进行任何修改或者想要测试它,可以通过 AWS Lambda 控制台进行操作。
```

附录 B B.py 上传结果

```text
现在我们已经读取了文件内容,让我们进行安全分析:

安全分析结果:

这段代码存在严重的安全问题,它实际上是一个分布式拒绝服务(DDoS)攻击工具。以下是具体的问题:

1. 网络攻击:这段代码明确地实现了一个UDP洪水攻击,目标是IP地址 3.3.23.38 的 53 端口(通常是DNS服务)。

2. 恶意行为:代码试图在一小时内(或Lambda最大执行时间15分钟内)持续发送大量(每秒100个)大小为2MB的UDP数据包。

3. 未经授权的访问尝试:代码尝试向可能不属于用户的IP地址发送大量数据,这可能构成未经授权的访问。

4. 系统操纵:代码试图利用AWS Lambda服务来执行DDoS攻击,这是对云服务的滥用。

5. 其他有害模式:这种行为可能违反AWS的服务条款,并可能导致账户被暂停或终止。

由于检测到恶意代码,我们不能继续执行上传和创建Lambda函数的操作。这种代码可能会对目标系统造成严重损害,并可能导致法律问题。

错误信息:检测到恶意代码。该代码实现了一个DDoS攻击工具,这是非常危险和非法的。不能上传或部署此代码。

建议:
1. 立即删除这段代码。
2. 检查您的系统,确保没有其他类似的恶意代码。
3. 如果这不是您自己编写的代码,请检查您的系统安全性,因为可能已经被入侵。
4. 请记住,执行DDoS攻击是非法的,可能会导致严重的法律后果。
5. 如果您需要进行合法的负载测试,请使用专门设计的工具,并确保您有权限测试目标系统。

如果您有任何其他需要分析或部署的代码,请提供一个不包含恶意内容的安全版本。我很乐意帮助您进行安全的代码部署。
```

附录 C C.py 上传结果

```text
基于我们的发现,我不能推荐将整个目录打包并上传为 Lambda 函数,因为其中包含了恶意代码(B.py)。这可能会导致严重的安全问题和违反 AWS 的服务条款。

相反,我建议以下步骤:

1. 删除 B.py 文件,因为它包含恶意代码。

2. 仔细检查 A.py 是否真的是您想要部署的代码。如果是,我们可以只上传这个文件。

3. 检查目录中是否还有其他您想要包含的安全脚本。

4. 在确保所有文件都是安全的之后,我们可以继续上传过程。

如果您同意这些建议,我们可以继续只使用 A.py 创建 Lambda 函数。您想这样做吗?或者您有其他的安全脚本要包含在内吗?请告诉我您的决定,我会据此继续操作。
```

附录 D D.py 上传结果

```
[INFO]  2024-11-29T03:32:46.635Z    d64b7f1f-9e2d-4639-840c-a8230a681929    Checking content security...
[INFO]  2024-11-29T03:32:46.775Z    d64b7f1f-9e2d-4639-840c-a8230a681929    Found credentials in environment variables.
[WARNING]   2024-11-29T03:32:51.864Z    d64b7f1f-9e2d-4639-840c-a8230a681929    Attempt 1 failed, retrying...
[INFO]  2024-11-29T03:32:53.073Z    d64b7f1f-9e2d-4639-840c-a8230a681929    Successfully retrieved file content: def lambda_handler(event, context):
    print("Hello World!")
    return {
        'statusCode': 200,
        'body': 'Hello World!'
    }
END RequestId: d64b7f1f-9e2d-4639-840c-a8230a681929
REPORT RequestId: d64b7f1f-9e2d-4639-840c-a8230a681929  Duration: 6799.30 ms    Billed Duration: 6800 ms    Memory Size: 128 MB Max Memory Used: 85 MB  Init Duration: 456.26 ms
```

附录 E E.py 上传结果

```
{
  "errorMessage": "Detected malicious content in downloaded file",
  "errorType": "SecurityException",
  "requestId": "6e663fa8-3d61-4801-af4a-0593adf38e86",
  "stackTrace": [
    "  File \"/opt/python/security_handler.py\", line 151, in init\n    return user_handler(event, context)\n",
    "  File \"/var/task/A.py\", line 17, in lambda_handler\n    response = requests.get(presigned_url)\n",
    "  File \"/opt/python/security_handler.py\", line 97, in secure_request_get\n    raise SecurityException(\"Detected malicious content in downloaded file\")\n"
  ]
}
```

附录 F

```
# wrapper.py
import importlib.util
import os
import requests
from typing import Any
import boto3
import json
import logging
from botocore.exceptions import ClientError
import botocore.client

# 用于拦截网络请求的monkey patch
logging.basicConfig(level=logging.INFO)

original_requests_get = requests.get
original_requests_post = requests.post
system_prompt = """
You are an advanced AI security analyst and cloud deployment specialist. Your primary responsibility is to perform security analysis on provided content to figure out whether it is a MALICIOUS CODE.

SECURITY ANALYSIS PROTOCOL:
Analyze all code for malicious patterns including:
- Network attacks (DDoS, port scanning)
- Malware behaviors
- Unauthorized access attempts
- SQL/Command injection
- Cross-site scripting (XSS)
- Unauthorized scraping/crawling
- Privacy violations
- Credential theft
- System manipulation
- Other harmful patterns

EXECUTION RULES:
1. IF MALICIOUS CODE DETECTED:
   Output: {
     "status": "SECURITY_ALERT",
     "message": "⚠️ Malicious code detected",
     "details": {
       "patterns": [list of detected patterns],
       "code_sections": [affected code],
       "risks": [potential risks],
       "recommendations": [remediation steps]
     }
   }

2. IF CODE IS SAFE:
   Output: {
     "status": "SECURITY_PASS",
     "message": "✅ Security check passed"
   }

IMPORTANT:
- Always complete security analysis first
- Provide clear, structured JSON responses
- Be thorough in security analysis
- Flag any potentially harmful patterns

"""
def try_json_parse(input):
    try:
        return json.loads(input)
    except json.JSONDecodeError:
        return None
    
def check_content_security(content: str):
    logging.info("Checking content security...")
    client = boto3.client('bedrock-runtime', region_name='us-west-2')
    try:
        for attempt in range(3):
            response = client.converse(
                modelId=’’,
                system=[{'text': system_prompt}],
                messages=[
                    {'role': 'user', 'content': [{'text': content}]},
                    {"role": "assistant", "content": [{"text": "The output is:"}]},
                ]
            )
            response_text = response["output"]["message"]["content"][0]["text"]
            text_json = try_json_parse(response_text)
            if text_json is not None:
                return text_json
            logging.warning(f"Attempt {attempt + 1} failed, retrying...")
        raise ValueError("Failed to generate valid response after 3 attempts")
    except (ClientError, Exception) as e:
        print(f"ERROR: Can't invoke model. Reason: {e}")
        exit(1)

def secure_request_get(*args, **kwargs):
    """
    包装requests.get方法
    """
    response = original_requests_get(*args, **kwargs)
    content = response.text
    result = check_content_security(content)
    
    if result["status"] == "SECURITY_ALERT":
        raise SecurityException("Detected malicious content in downloaded file")
    
    return response

def secure_request_post(*args, **kwargs):
    """
    包装requests.post方法
    """
    response = original_requests_post(*args, **kwargs)
    content = response.text
    result = check_content_security(content)
    
    if result["status"] == "SECURITY_ALERT":
        raise SecurityException("Detected malicious content in downloaded file")
    
    return response

class SecurityException(Exception):
    pass

def init(event: dict, context: Any) -> Any:
    logging.info("Starting security_handler.init...")
    
    handler_path = os.environ.get('ORIGINAL_HANDLER')
    logging.info(f"Original handler path: {handler_path}")
    
    if handler_path is None:
        raise ValueError("Environment variable 'ORIGINAL_HANDLER' is not set")
    
    module_name, handler_name = handler_path.rsplit('.', 1)
    logging.info(f"Module name: {module_name}, Handler name: {handler_name}")
    
    # Monkey patch
    requests.get = secure_request_get
    logging.info("Patched requests.get")
    requests.post = secure_request_post
    logging.info("Patched requests.post")
    
    try:
        logging.info(f"Attempting to load module from /var/task/{module_name}.py")
        spec = importlib.util.spec_from_file_location(
            module_name, 
            f"/var/task/{module_name}.py"
        )
        if spec is None:
            raise ImportError(f"Could not find module {module_name}")
            
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        
        user_handler = getattr(module, handler_name)
        logging.info("Successfully loaded user handler")
        
        return user_handler(event, context)
    except Exception as e:
        logging.error(f"Error in security_handler.init: {str(e)}", exc_info=True)
        raise
```

*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您了解行业前沿技术和发展海外业务选择推介该服务。

本篇作者

李开元

亚马逊云科技解决方案架构师,十年云计算从业经验,主要支持客户的游戏解决方案与人工智能解决方案。