业务背景与安全挑战
随着全球能源转型的加速推进,新能源领域正经历前所未有的发展机遇。在这一背景下,越来越多的企业选择将核心业务系统迁移上云,以期获得更高的弹性扩展能力和运维效率。本文案例中的新能源领域头部企业,基于业务合规要求,将其在欧洲的核心业务系统从其他平台迁移至 AWS。
然而,云资源的高度虚拟化和灵活性,也为企业带来了新的安全管理挑战。在新能源行业,数据不仅关系到企业的商业机密,更涉及能源基础设施的安全运行。当关键资源被意外或未经授权删除时,可能导致业务中断、数据丢失,甚至引发更广泛的安全隐患。
云平台迁移过程中,运维团队通常需要执行大量资源创建、配置和删除操作。如何确保这些高风险操作都在可控范围内,并能及时发现潜在安全威胁,成为企业云安全治理的关键环节。
无服务器安全监控架构设计
在分析企业核心安全诉求后,我们设计了一套基于 AWS 原生服务的无服务器安全监控方案。该架构充分利用 AWS 云原生能力,实现了从事件捕获到告警推送的全流程自动化,同时保持了极高的成本效益比。
此安全监控体系的设计理念基于以下四大核心目标:
实时性:通过事件驱动模型,确保高危操作能在秒级被发现并触发告警
精准性:利用 EventBridge 的事件模式匹配功能,精确识别需要监控的资源操作类型
低维护成本:完全基于 Serverless 架构,无需管理服务器,按实际触发次数计费
高扩展性:监控范围和告警规则可轻松扩展,适应不断变化的安全需求
整体架构采用了”事件源 → 事件处理 → 消息通知”的流水线模式,确保系统各组件间低耦合、高内聚,便于后续功能扩展和维护。
技术组件与工作流程详解
本监控告警体系由以下核心组件构成,形成完整的事件处理流水线:
1. CloudTrail – 作为整个监控体系的事件源,CloudTrail 记录 AWS 账户内的所有 API 调用,包括管理控制台操作、SDK 调用及 CLI 指令执行。特别是针对资源删除类高危操作,CloudTrail 提供了详尽的执行者身份、操作时间、资源标识等审计数据。
2. EventBridge – 担任事件路由器角色,通过预设的事件模式规则,实时监测 CloudTrail 日志流中的特定模式。在本方案中,我们主要过滤捕获包含`Delete`、`Remove`、`Terminate`等关键动作的管理事件。
3. SNS (Simple Notification Service) – 作为事件分发中心,接收 EventBridge 转发的高危操作事件,并触发后续处理流程。SNS的发布-订阅模型允许同一事件被多个下游服务并行处理,为未来功能扩展奠定基础。
4. Lambda 函数 – 充当告警处理引擎,订阅 SNS 主题并在接收到事件通知后执行:
- 解析事件详情,提取关键信息
- 根据资源类型构建结构化告警内容
- 调用钉钉开放 API,发送格式化告警消息
- 记录告警发送状态与结果
5. 钉钉群组 – 最终告警接收终端,通过自定义机器人接收并展示高危操作告警,支持@相关负责人,确保问题能被及时关注和处理。
当管理员或应用程序执行高危删除操作时,从事件产生到告警送达的整个流程完全自动化,无需人工干预。这种事件驱动的架构不仅响应迅速,而且资源利用高效,真正做到按需付费,避免了传统监控系统的资源闲置问题。
通过这套完整的监控告警机制,企业可以在 AWS 云环境中建立有效的安全防护网,及时发现并应对潜在的资源安全风险,保障业务系统的稳定运行。
技术实现与部署流程
1. 配置消息通知主题(SNS)
首先,我们需要创建 SNS(Simple Notification Service)主题,作为事件传递的中转站。SNS 主题将作为 EventBridge 规则的目标,接收所有符合条件的高危操作事件,并触发后续处理流程。
SNS 主题配置完成后,为其添加访问策略,确保 EventBridge 服务有权限向该主题发布消息。这一步骤为事件通知链路的第一环,奠定了消息传递的基础。
2. 创建 EventBridge 规则
接下来,在 AWS EventBridge 控制台创建规则,用于捕获和过滤关键的资源操作事件。规则通过事件模式匹配从 CloudTrail 流入的 API 调用日志,筛选出需要监控的高危操作。以下是 EventBridge 规则事件模式的 JSON 代码示例,可根据自己的需求灵活调整 eventSource 和 eventName:
{
"source": ["aws.s3", "aws.ec2", "aws.iam", "aws.elasticloadbalancing"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["s3.amazonaws.com", "ec2.amazonaws.com", "iam.amazonaws.com", "elasticloadbalancing.amazonaws.com"],
"eventName": ["DeleteBucket", "DeleteUser", "DeleteLoadBalancer", "DeleteSecurityGroup", "AuthorizeSecurityGroupIngress", "AuthorizeSecurityGroupEgress", "RevokeSecurityGroupIngress", "RevokeSecurityGroupEgress"]
}
}
此事件模式定义了我们重点关注的服务(如 S3、EC2、IAM、负载均衡器),事件类型(AWS API 调用),以及具体的事件名称(如删除存储桶、删除用户等)。规则创建后,将 SNS 主题设置为其目标,形成事件捕获到消息触发的流程链路,EventBridge 将会触发相应的操作。
注:IAM User 的操作事件记录在 us-east-1 Region Cloudtrail, 若要对 IAM 的操作实行同样的监控告警,需要在 us-east-1 Region 重复部署该方案。
3. 创建事件处理函数(Lambda)
Lambda 函数是整个告警系统的核心处理单元,负责接收事件通知、解析关键信息、格式化告警内容并推送至钉钉群组。创建一个 Lambda 函数,该函数将作为 SNS 主题的订阅者,接收 SNS 发送的事件消息。以下是 Lambda 函数的 Python 代码实现:
import json
import urllib3
import os
import logging
import time
import hmac
import base64
import urllib.parse
import hashlib
from datetime import datetime, timedelta, timezone
# 配置日志记录
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 调试开关(True 表示开启调试模式,False 表示关闭)
DEBUG_MODE = os.environ.get('DEBUG_MODE', 'False').lower() == 'true'
# 只通知删除类事件(True 仅通知删除操作,False 允许所有事件)
NOTIFY_DELETE_EVENTS_ONLY = os.environ.get('NOTIFY_DELETE_EVENTS_ONLY', 'True').lower() == 'False'
# 钉钉机器人 Webhook 地址(从环境变量中获取)
DINGTALK_WEBHOOK = os.environ.get('DINGTALK_WEBHOOK')
# 钉钉机器人密钥(从环境变量中获取,用于加签)
DINGTALK_SECRET = os.environ.get('DINGTALK_SECRET')
# 钉钉消息图片链接(从环境变量中获取)
BANNER_IMAGE_URL = os.environ.get('BANNER_IMAGE_URL')
def lambda_handler(event, context):
# 记录接收到的事件
logger.info('接收到的事件: %s', json.dumps(event))
try:
# 解析 SNS 消息
if 'Records' in event:
sns_message = json.loads(event['Records'][0]['Sns']['Message'])
else:
sns_message = event
logger.info('解析的 SNS 消息: %s', json.dumps(sns_message))
# 提取关键信息
detail = sns_message['detail']
event_time = detail.get('eventTime', '未知时间')
event_name = detail.get('eventName', '未知操作')
# **如果启用了删除类事件通知开关,则跳过非删除操作**
delete_events = ['DeleteSecurityGroup', 'RevokeSecurityGroupIngress', 'RevokeSecurityGroupEgress', 'DeleteLoadBalancer']
if NOTIFY_DELETE_EVENTS_ONLY and event_name not in delete_events:
logger.info('非删除类事件,跳过通知: %s', event_name)
return {
'statusCode': 200,
'body': json.dumps(f'跳过非删除类事件: {event_name}')
}
user_identity = detail['userIdentity']
user_type = user_identity.get('type', '')
user_name = user_identity.get('userName', '未知用户')
# 如果是 AssumedRole,则设置用户名称为 "假借角色"
if user_type == 'AssumedRole':
user_name = "假借角色"
user_arn = user_identity.get('arn', '未知用户 ARN')
source_ip = detail.get('sourceIPAddress', '未知 IP')
region = sns_message.get('region', '未知区域')
deleted_resource = extract_deleted_resource(detail)
# 转换时间为+8时区
event_time = convert_to_utc8(event_time)
# 构造钉钉消息内容
message = f"\n" \
f"- **事件时间**: {event_time}\n" \
f"- **操作名称**: {event_name}\n" \
f"- **用户名称**: {user_name}\n" \
f"- **用户 ARN**: {user_arn}\n" \
f"- **源 IP 地址**: {source_ip}\n" \
f"- **区域**: {region}\n" \
f"- **被操作资源**: {deleted_resource}\n\n"
if DEBUG_MODE:
logger.info('构造的钉钉消息内容: %s', message)
# 发送消息到钉钉
send_dingtalk_message(message)
except Exception as e:
logger.error('处理事件时发生错误: %s', str(e), exc_info=True)
return {
'statusCode': 500,
'body': json.dumps(f'处理事件时发生错误: {str(e)}')
}
return {
'statusCode': 200,
'body': json.dumps('消息发送成功')
}
def convert_to_utc8(time_str):
"""
将时间字符串转换为+8时区
"""
if time_str == '未知时间':
return time_str
try:
# 尝试解析多种时间格式
dt = datetime.fromisoformat(time_str.replace('Z', '+00:00'))
except ValueError:
try:
# 如果 ISO 格式解析失败,尝试其他常见格式
dt = datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S.%fZ')
except ValueError:
return time_str
dt_utc8 = dt.astimezone(timezone(timedelta(hours=8)))
return dt_utc8.strftime('%Y-%m-%d %H:%M:%S')
def extract_deleted_resource(detail):
"""
提取被删除或被修改的资源信息
"""
resources = detail.get('resources', [])
for resource in resources:
if 'ARN' in resource:
return resource['ARN']
request_parameters = detail.get('requestParameters', {})
response_elements = detail.get('responseElements', {})
event_name = detail.get('eventName', '')
# 处理 response_elements 可能为 None 的情况
if response_elements is None:
response_elements = {}
# 处理撤销安全组规则
if event_name in ['RevokeSecurityGroupIngress', 'RevokeSecurityGroupEgress']:
security_group_id = request_parameters.get('groupId', '未知安全组')
rule_details = []
# 优先检查 responseElements['revokedSecurityGroupRuleSet']
revoked_rules = response_elements.get('revokedSecurityGroupRuleSet', {}).get('items', [])
if not revoked_rules:
revoked_rules = request_parameters.get('ipPermissions', {}).get('items', [])
for rule in revoked_rules:
ip_protocol = rule.get('ipProtocol', '未知协议')
from_port = rule.get('fromPort', '未知端口')
to_port = rule.get('toPort', '未知端口')
# 检查 IP 范围(优先使用 cidrIpv4 / cidrIpv6)
cidr_ip_list = [ip.get('cidrIp', '未知 IP') for ip in rule.get('ipRanges', {}).get('items', [])]
if 'cidrIpv4' in rule:
cidr_ip_list.append(rule['cidrIpv4'])
if 'cidrIpv6' in rule:
cidr_ip_list.append(rule['cidrIpv6'])
cidr_ip = ', '.join(cidr_ip_list) if cidr_ip_list else '未知 IP'
rule_details.append(f"协议: {ip_protocol}, 端口: {from_port}-{to_port}, 被撤销 IP: {cidr_ip}")
rule_info = "; ".join(rule_details) if rule_details else "未知规则"
return f"安全组: {security_group_id}, 被撤销规则: {rule_info}"
elif event_name in ['AuthorizeSecurityGroupIngress', 'AuthorizeSecurityGroupEgress']:
if 'groupId' in request_parameters:
rule_details = []
for rule in request_parameters.get('ipPermissions', {}).get('items', []):
ip_protocol = rule.get('ipProtocol', '未知协议')
from_port = rule.get('fromPort', '未知端口')
to_port = rule.get('toPort', '未知端口')
cidr_ip = ', '.join([ip.get('cidrIp', '未知 IP') for ip in rule.get('ipRanges', {}).get('items', [])])
rule_details.append(f"协议: {ip_protocol}, 端口: {from_port}-{to_port}, 允许 IP: {cidr_ip}")
rule_info = "; ".join(rule_details) if rule_details else "未知规则"
return f"安全组: {request_parameters['groupId']}, 新增规则: {rule_info}"
elif event_name == 'DeleteSecurityGroup' and 'groupId' in request_parameters:
return f"安全组: {request_parameters['groupId']} 已被删除"
elif event_name == 'DeleteLoadBalancer' and 'loadBalancerArn' in request_parameters:
return request_parameters['loadBalancerArn']
elif event_name == 'DeleteUser' and 'userName' in request_parameters:
return f"IAM 用户 {request_parameters['userName']} 已被删除"
elif 'key' in request_parameters:
return request_parameters['key']
elif 'name' in request_parameters:
return request_parameters['name']
elif 'ARN' in response_elements:
return response_elements['ARN']
return '未知资源'
def send_dingtalk_message(content):
"""
发送消息到钉钉,包含加签逻辑
"""
# 获取当前时间戳(毫秒)
timestamp = str(round(time.time() * 1000))
# 计算签名
secret_enc = DINGTALK_SECRET.encode('utf-8')
string_to_sign = f'{timestamp}\n{DINGTALK_SECRET}'
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
# 调试模式下输出签名和时间戳
if DEBUG_MODE:
logger.info('钉钉消息签名: timestamp=%s, sign=%s', timestamp, sign)
# 构造带签名的 URL
webhook_url = f"{DINGTALK_WEBHOOK}×tamp={timestamp}&sign={sign}"
headers = {'Content-Type': 'application/json'}
payload = {
"msgtype": "markdown",
"markdown": {
"title": "AWS 操作通知",
"text": content
}
}
http = urllib3.PoolManager()
response = http.request('POST', webhook_url, body=json.dumps(payload), headers=headers)
if response.status != 200:
response_data = json.loads(response.data.decode('utf-8'))
error_msg = f"发送钉钉消息失败: 状态码={response.status}, 错误信息={response_data.get('errmsg', '未知错误')}"
logger.error(error_msg)
raise Exception(error_msg)
logger.info('钉钉消息发送成功')
4. Lambda 函数实现细节
4.1 事件解析与信息提取
函数首先解析从 SNS 接收的事件消息,提取事件发生时间、操作类型、执行者身份等关键信息,这些信息包括事件时间、操作名称、用户信息、源 IP 地址、区域以及被操作的资源等。通过这些信息,我们可以构建出一条清晰且详细的通知消息,为后续告警内容构建做准备。
4.2 判断执行者类型
在 Lambda 函数中,为了清晰呈现谁执行了高危操作,函数会分析事件中的 userIdentity 字段,区分是普通 IAM 用户(user)还是通过角色(role)执行的操作,这种区分有助于安全团队快速判断操作来源,尤其在使用跨账户角色或服务角色场景下。
user_identity = detail['userIdentity']
user_type = user_identity.get('type', '')
user_name = user_identity.get('userName', '未知用户')
# 如果是 AssumedRole,则设置用户名称为 "假借角色"
if user_type == 'AssumedRole':
user_name = "假借角色"
user_arn = user_identity.get('arn', '未知用户 ARN')
在这里,我们通过 userIdentity
中的 type
字段来判断执行者的类型。type
通常包含用户或角色的标识信息,我们通过分割该字段来获取执行者的类型。
4.3 时区本地化处理
由于 AWS 事件的时间戳默认为 UTC 时间,而我们通常需要将其转换为本地时区(如东八区)以便于阅读。convert_to_utc8
函数实现了这一功能。
4.4 资源操作详情提取
针对不同类型的高危操作,extract_deleted_resource
函数实现了智能解析逻辑,能精确识别被操作的资源及其详情,这可能包括 S3 存储桶名称、IAM 用户名称、负载均衡器 ARN 等。对于安全组规则的修改,该函数还会详细解析被撤销或新增的规则信息。
def extract_deleted_resource(detail):
"""
提取被删除或被修改的资源信息
"""
resources = detail.get('resources', [])
for resource in resources:
if 'ARN' in resource:
return resource['ARN']
request_parameters = detail.get('requestParameters', {})
response_elements = detail.get('responseElements', {})
event_name = detail.get('eventName', '')
# 处理 response_elements 可能为 None 的情况
if response_elements is None:
response_elements = {}
# 处理撤销安全组规则
if event_name in ['RevokeSecurityGroupIngress', 'RevokeSecurityGroupEgress']:
security_group_id = request_parameters.get('groupId', '未知安全组')
rule_details = []
# 优先检查 responseElements['revokedSecurityGroupRuleSet']
revoked_rules = response_elements.get('revokedSecurityGroupRuleSet', {}).get('items', [])
if not revoked_rules:
revoked_rules = request_parameters.get('ipPermissions', {}).get('items', [])
for rule in revoked_rules:
ip_protocol = rule.get('ipProtocol', '未知协议')
from_port = rule.get('fromPort', '未知端口')
to_port = rule.get('toPort', '未知端口')
# 检查 IP 范围(优先使用 cidrIpv4 / cidrIpv6)
cidr_ip_list = [ip.get('cidrIp', '未知 IP') for ip in rule.get('ipRanges', {}).get('items', [])]
if 'cidrIpv4' in rule:
cidr_ip_list.append(rule['cidrIpv4'])
if 'cidrIpv6' in rule:
cidr_ip_list.append(rule['cidrIpv6'])
cidr_ip = ', '.join(cidr_ip_list) if cidr_ip_list else '未知 IP'
rule_details.append(f"协议: {ip_protocol}, 端口: {from_port}-{to_port}, 被撤销 IP: {cidr_ip}")
rule_info = "; ".join(rule_details) if rule_details else "未知规则"
return f"安全组: {security_group_id}, 被撤销规则: {rule_info}"
elif event_name in ['AuthorizeSecurityGroupIngress', 'AuthorizeSecurityGroupEgress']:
if 'groupId' in request_parameters:
rule_details = []
for rule in request_parameters.get('ipPermissions', {}).get('items', []):
ip_protocol = rule.get('ipProtocol', '未知协议')
from_port = rule.get('fromPort', '未知端口')
to_port = rule.get('toPort', '未知端口')
cidr_ip = ', '.join([ip.get('cidrIp', '未知 IP') for ip in rule.get('ipRanges', {}).get('items', [])])
rule_details.append(f"协议: {ip_protocol}, 端口: {from_port}-{to_port}, 允许 IP: {cidr_ip}")
rule_info = "; ".join(rule_details) if rule_details else "未知规则"
return f"安全组: {request_parameters['groupId']}, 新增规则: {rule_info}"
elif event_name == 'DeleteSecurityGroup' and 'groupId' in request_parameters:
return f"安全组: {request_parameters['groupId']} 已被删除"
elif event_name == 'DeleteLoadBalancer' and 'loadBalancerArn' in request_parameters:
return request_parameters['loadBalancerArn']
elif event_name == 'DeleteUser' and 'userName' in request_parameters:
return f"IAM 用户 {request_parameters['userName']} 已被删除"
elif 'key' in request_parameters:
return request_parameters['key']
elif 'name' in request_parameters:
return request_parameters['name']
elif 'ARN' in response_elements:
return response_elements['ARN']
return '未知资源'
4.5 构造告警通知
在构造钉钉消息内容时,我们将上述相关信息包含在内,并格式化输出,告警消息采用 Markdown 格式,包含操作时间、类型、执行者、资源详情等关键信息,并可选添加自定义 banner 图片,提升视觉识别效果:
message = f"\n" \
f"- **事件时间**: {event_time}\n" \
f"- **操作名称**: {event_name}\n" \
f"- **用户名称**: {user_name}\n" \
f"- **用户 ARN**: {user_arn}\n" \
f"- **源 IP 地址**: {source_ip}\n" \
f"- **区域**: {region}\n" \
f"- **被操作资源**: {deleted_resource}\n\n"
5. 发送消息到钉钉
最后一步是将格式化的告警信息推送至钉钉工作群,让运维团队能第一时间获知高危操作。为了将告警通知发送到钉钉,我们需要先在钉钉消息群中创建一个机器人,并获取其 Webhook 地址和密钥。在 Lambda 函数中,我们通过以下方式与钉钉建立连接:
# 钉钉机器人 Webhook 地址(从环境变量中获取)
DINGTALK_WEBHOOK = os.environ.get('DINGTALK_WEBHOOK')
# 钉钉机器人密钥(从环境变量中获取,用于加签)
DINGTALK_SECRET = os.environ.get('DINGTALK_SECRET')
在 Lambda 函数中,我们使用 os.environ.get
方法从环境变量中获取钉钉机器人的 Webhook 地址和密钥。这些信息在 Lambda 函数的配置中进行设置,确保了敏感信息的安全性。
接下来,我们通过以下步骤发送消息到钉钉:
- 获取当前时间戳(毫秒)。
- 计算签名,使用钉钉机器人密钥对时间戳进行加密。
- 构造带签名的 URL,将时间戳和签名作为参数附加到 Webhook 地址上。
- 构造消息负载,包括消息类型和内容(Markdown)。
- 使用 HTTP POST 方法将消息发送到钉钉机器人。
def send_dingtalk_message(content):
"""
发送消息到钉钉,包含加签逻辑
"""
# 获取当前时间戳(毫秒)
timestamp = str(round(time.time() * 1000))
# 计算签名
secret_enc = DINGTALK_SECRET.encode('utf-8')
string_to_sign = f'{timestamp}\n{DINGTALK_SECRET}'
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
# 构造带签名的 URL
webhook_url = f"{DINGTALK_WEBHOOK}×tamp={timestamp}&sign={sign}"
headers = {'Content-Type': 'application/json'}
payload = {
"msgtype": "markdown",
"markdown": {
"title": "AWS 操作通知",
"text": content
}
}
http = urllib3.PoolManager()
response = http.request('POST', webhook_url, body=json.dumps(payload), headers=headers)
if response.status != 200:
response_data = json.loads(response.data.decode('utf-8'))
error_msg = f"发送钉钉消息失败: 状态码={response.status}, 错误信息={response_data.get('errmsg', '未知错误')}"
logger.error(error_msg)
raise Exception(error_msg)
logger.info('钉钉消息发送成功')
钉钉机器人收到告警后,会在群组中即时展示高危操作详情,确保安全团队能快速响应并采取必要措施,最大限度降低误操作或恶意行为带来的风险。
结论
通过部署这套基于 AWS Serverless 架构的高危操作监控体系,新能源企业成功实现了云资源安全态势的全面可视化管理。该方案结合 EventBridge 事件过滤、Lambda 函数处理与钉钉实时推送的无服务器架构,不仅降低了运维成本,更在实际应用中证明了其高效性与可靠性。系统上线后,企业安全团队能够在平均 10 秒内获知任何高风险操作,将潜在风险响应时间从原先的数小时缩短至分钟级别。
此架构的核心价值在于既满足了新能源行业严格的合规要求,又通过事件驱动模式实现了精准的资源变更监控。特别是对于安全组规则、IAM 权限和核心业务资源的删除操作,钉钉告警提供的详细上下文信息,使安全审计与应急响应流程得到显著优化。
随着企业云基础设施的持续演进,该监控告警体系还可进一步扩展:
- 整合安全评分系统,对高危操作进行风险自动评级
- 引入 AI 分析模块,识别异常操作模式和潜在安全威胁
- 开发自动化修复功能,针对特定误操作实现快速回滚
- 扩展监控范围至更多 AWS 服务和资源类型
这种轻量级、低耦合的 Serverless 安全架构,不仅是云资源操作可视化的有效解决方案,更代表了云原生时代安全治理的最佳实践路径。企业可根据自身业务特点和安全需求,基于此框架灵活构建符合行业标准的全方位云资源安全防护体系。
本篇作者