背景
AWS Shield Advanced是亚马逊网络服务(AWS)提供的一种高级DDoS(分布式拒绝服务)保护服务,旨在帮助用户保护其AWS基础架构免受DDoS攻击的影响。本文将深入探讨AWS Shield Advanced的功能和必要性,并介绍了开发AWS Shield Advanced批量添加ALB防护功能的优势和实现方式。
AWS Shield Advanced的功能和必要性
- DDoS保护:AWS Shield Advanced能够检测和缓解各种类型的DDoS攻击,包括网络层攻击、应用层攻击和协议层攻击。通过机器学习和实时流量分析,它识别和过滤恶意流量,确保正常流量能够正常访问您的应用程序。
- 自动化防护:提供自动化的DDoS防护,无需用户干预。它能够自动检测和缓解攻击,并在攻击发生时自动启动防护措施,快速响应攻击,减少对应用程序的影响。
- 实时监控和报警:AWS Shield Advanced提供实时监控和报警功能,帮助用户及时了解攻击情况。提供有关攻击类型、攻击流量和攻击来源的详细信息,以及实时报警通知,使用户能够快速采取措施来应对攻击。
- 高可用性:构建在AWS全球基础设施上,具有高可用性和弹性。能够保护全球范围内的应用程序免受DDoS攻击的影响,并确保应用程序的可用性和性能。
开发AWS Shield Advanced批量添加ALB防护的好处
- 简化管理:批量添加ALB防护功能可以简化管理过程,一次性为多个ALB启用防护,节省时间和精力,提高管理效率。
- 统一防护策略:通过批量添加ALB防护功能,用户可以为多个ALB应用相同的防护策略,确保一致的防护水平,并简化策略管理。
- 快速响应:批量添加ALB防护功能可以快速响应潜在的DDoS攻击。检测到攻击时,用户可以立即为所有受影响的ALB启用防护,降低攻击造成的损失。
前提条件
确保已在us-east-1安装区域安装了AWS Shield Advanced功能,并提前设置好本地开发环境的Access Key和Secret Key。
实现方式
以下是使用Python和Boto3库实现的示例代码:
## 开发环境
Python 3.8.2
## 安装依赖包
pip install boto3==1.26.105
## 用户权限
用户所需的权限,如下所示
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"wafv2:ListResourcesForWebACL",
"cloudwatch:PutMetricAlarm",
"shield:AssociateHealthCheck",
"elasticloadbalancing:DescribeLoadBalancers",
"wafv2:ListWebACLs",
"wafv2:AssociateWebACL",
"route53:CreateHealthCheck",
"shield:CreateProtection",
"shield:EnableApplicationLayerAutomaticResponse",
"shield:ListProtections",
"apigateway:SetWebACL",
"apigateway:GET",
"iam:GetRole",
"elasticloadbalancing:SetWebACL"
],
"Resource": "*"
}
]
}
```
## Shield批量添加ALB防护
主要是对外部提供服务的ALB添加Shield,新增保护返回状态200,空白说明已经受保护
```python
import boto3 # 导入boto3库,用于与AWS服务进行交互
from datetime import datetime # 导入datetime模块,用于获取当前时间
def get_load_balancer_arns():
elbv2 = boto3.client('elbv2', region_name='us-east-1') # 创建elbv2客户端对象
response = elbv2.describe_load_balancers() # 调用describe_load_balancers方法获取负载均衡器信息
load_balancer_arns = [lb['LoadBalancerArn'] for lb in response['LoadBalancers'] if
lb['Type'] == 'application' and not lb['Scheme'] == 'internal'] # 提取满足条件的负载均衡器ARN
return load_balancer_arns
def create_cloudwatch_alarm(alarm_name, load_balancer_name):
cloudwatch = boto3.client('cloudwatch', region_name='us-east-1') # 创建cloudwatch客户端对象
response = cloudwatch.put_metric_alarm(
AlarmName=alarm_name,
MetricName='TargetResponseTime',
Namespace='AWS/ApplicationELB',
Statistic='Average',
Dimensions=[
{
'Name': 'LoadBalancer',
'Value': load_balancer_name
},
],
Period=60,
EvaluationPeriods=20,
DatapointsToAlarm=1,
Threshold=5.0,
ComparisonOperator='GreaterThanOrEqualToThreshold',
TreatMissingData='notBreaching', # notBreaching缺失数据不告警;missing缺失数据告警
Unit='Seconds'
) # 调用put_metric_alarm方法创建云监控告警
def create_route53_health_check(alarm_name):
route53 = boto3.client('route53') # 创建route53客户端对象
current_time = datetime.now().strftime("%Y-%m-%d-%H:%M:%S") # 获取当前时间
response = route53.create_health_check(
CallerReference=f'{current_time}',
HealthCheckConfig={
'Type': 'CLOUDWATCH_METRIC',
'Inverted': False,
'Disabled': False,
'AlarmIdentifier': {
'Region': 'us-east-1',
'Name': f'{alarm_name}'
},
'InsufficientDataHealthStatus': 'LastKnownStatus'
}
) # 调用create_health_check方法创建Route53健康检查
health_check_arn = response["HealthCheck"]["Id"] # 提取健康检查的ARN
return health_check_arn
def is_alb_protected(alb_arn):
shield = boto3.client('shield') # 创建shield客户端对象
response = shield.list_protections() # 调用list_protections方法获取防护资源信息
protected_resources = [protection['ResourceArn'] for protection in response['Protections']] # 提取防护资源的ARN
return alb_arn in protected_resources # 判断负载均衡器是否在防护资源中
def create_shield_protection(alb_arn, protection_name):
shield = boto3.client('shield') # 创建shield客户端对象
response = shield.create_protection(
Name=protection_name,
ResourceArn=alb_arn
) # 调用create_protection方法创建Shield防护
return response
def protect_alb(alb_arn):
protection_name = alb_arn.split("/")[-1] # 提取负载均衡器名称
response = create_shield_protection(alb_arn, protection_name) # 调用create_shield_protection方法创建Shield防护
protection_id = response['ProtectionId'] # 提取防护ID
return protection_id
def associate_health_check(health_check_arn, protection_id):
shield = boto3.client('shield') # 创建shield客户端对象
response = shield.associate_health_check(
ProtectionId=protection_id,
HealthCheckArn=health_check_arn
) # 调用associate_health_check方法关联健康检查和防护
return response
def enable_shield_automatic_response(resource_arn, action):
shield = boto3.client('shield') # 创建shield客户端对象
response = shield.enable_application_layer_automatic_response(ResourceArn=resource_arn, Action={f'{action}': {}})
return response
# 使用示例
# 检查ALB是否受到保护
load_balancer_arns = get_load_balancer_arns()
for alb_arn in load_balancer_arns:
alarm_name = f'TargetResponseTime-{alb_arn.split("/")[-1]}' # 根据ARN生成告警名称
load_balancer_name = '/'.join(alb_arn.split("/")[+1:]) # 提取负载均衡器名称
is_protected = is_alb_protected(alb_arn) # 判断负载均衡器是否受到保护
if not is_protected:
create_cloudwatch_alarm(alarm_name, load_balancer_name) # 创建云监控告警
health_check_id = create_route53_health_check(alarm_name) # 创建Route53健康检查
protection_id = protect_alb(alb_arn) # 创建Shield防护
health_check_arn = "arn:aws:route53:::healthcheck/" + health_check_id # 构建健康检查的ARN
response = associate_health_check(health_check_arn, protection_id) # 关联健康检查和防护
action = 'Count'
response = enable_shield_automatic_response(alb_arn, action) # 启用Shield自动响应
print(response["ResponseMetadata"]["HTTPStatusCode"]) # 打印响应状态码
```
## Shield批量移除ALB防护
以下详解权限没有单独验证,笔者是使用管理员实现,移除返回状态200,空白说明没有可移除
```python
import boto3 # 导入boto3库
def get_load_balancer_arns(): # 定义获取负载均衡器ARN的函数
elbv2 = boto3.client('elbv2', region_name='us-east-1') # 创建elbv2客户端对象
response = elbv2.describe_load_balancers() # 调用describe_load_balancers方法获取负载均衡器信息
load_balancer_arns = [lb['LoadBalancerArn'] for lb in response['LoadBalancers'] if
lb['Type'] == 'application' and not lb['Scheme'] == 'internal'] # 提取满足条件的负载均衡器ARN
return load_balancer_arns # 返回负载均衡器ARN列表
def describe_shield_protection(resource_arn): # 定义描述Shield保护的函数
shield = boto3.client('shield') # 创建shield客户端对象
response = shield.describe_protection(
ResourceArn=resource_arn
) # 调用describe_protection方法获取资源的Shield保护信息
return response # 返回Shield保护信息
def delete_protection(protection_id): # 定义删除Shield保护的函数
shield = boto3.client('shield') # 创建shield客户端对象
response = shield.delete_protection(ProtectionId=f'{protection_id}') # 调用delete_protection方法删除指定的Shield保护
return response # 返回删除结果
def delete_health_check(health_check_id): # 定义删除健康检查的函数
route53 = boto3.client('route53') # 创建route53客户端对象
response = route53.delete_health_check(
HealthCheckId=health_check_id
) # 调用delete_health_check方法删除指定的健康检查
return response # 返回删除结果
def delete_alarms(alarm_name): # 定义删除告警的函数
cloudwatch = boto3.client('cloudwatch') # 创建cloudwatch客户端对象
response = cloudwatch.delete_alarms(
AlarmNames=[
f'{alarm_name}',
]
) # 调用delete_alarms方法删除指定的告警
return response # 返回删除结果
def is_alb_protected(alb_arn): # 定义判断负载均衡器是否受到Shield保护的函数
shield = boto3.client('shield') # 创建shield客户端对象
response = shield.list_protections() # 调用list_protections方法获取所有的Shield保护信息
protected_resources = [protection['ResourceArn'] for protection in response['Protections']] # 提取所有受到Shield保护的资源ARN
return alb_arn in protected_resources # 返回负载均衡器ARN是否在受到Shield保护的资源列表中
# 使用示例
load_balancer_arns = get_load_balancer_arns() # 获取负载均衡器ARN列表
for resource_arn in load_balancer_arns: # 遍历负载均衡器ARN列表
is_protected = is_alb_protected(resource_arn) # 判断负载均衡器是否受到Shield保护
if is_protected: # 如果受到保护
alarm_name = f'TargetResponseTime-{resource_arn.split("/")[-1]}' # 根据ARN生成告警名称
response = describe_shield_protection(resource_arn) # 获取Shield保护信息
protection_id = response['Protection']['Id'] # 获取Shield保护ID
health_check_id = response['Protection']['HealthCheckIds'][0] # 获取健康检查ID
delete_protection(protection_id) # 删除Shield保护
delete_health_check(health_check_id) # 删除健康检查
delete_alarms(alarm_name) # 删除告警
```
结论
AWS Shield Advanced提供了强大的DDoS保护功能,而通过开发批量管理ALB防护的功能,可以进一步简化管理过程,提供快速响应和统一的防护策略。这些功能和好处使得AWS Shield Advanced成为保护AWS基础架构免受DDoS攻击的重要工具。
安全和运维经验为本文提供了深刻的见解,希望读者能够根据文中的指导,更好地利用AWS Shield Advanced保护其应用程序。