AWS Shield Advanced:批量管理ALB防护的最佳实践
  ndCIWJ3N86Az 2023年11月19日 22 0

背景

AWS Shield Advanced是亚马逊网络服务(AWS)提供的一种高级DDoS(分布式拒绝服务)保护服务,旨在帮助用户保护其AWS基础架构免受DDoS攻击的影响。本文将深入探讨AWS Shield Advanced的功能和必要性,并介绍了开发AWS Shield Advanced批量添加ALB防护功能的优势和实现方式。

AWS Shield Advanced的功能和必要性

  1. DDoS保护:AWS Shield Advanced能够检测和缓解各种类型的DDoS攻击,包括网络层攻击、应用层攻击和协议层攻击。通过机器学习和实时流量分析,它识别和过滤恶意流量,确保正常流量能够正常访问您的应用程序。
  2. 自动化防护:提供自动化的DDoS防护,无需用户干预。它能够自动检测和缓解攻击,并在攻击发生时自动启动防护措施,快速响应攻击,减少对应用程序的影响。
  3. 实时监控和报警:AWS Shield Advanced提供实时监控和报警功能,帮助用户及时了解攻击情况。提供有关攻击类型、攻击流量和攻击来源的详细信息,以及实时报警通知,使用户能够快速采取措施来应对攻击。
  4. 高可用性:构建在AWS全球基础设施上,具有高可用性和弹性。能够保护全球范围内的应用程序免受DDoS攻击的影响,并确保应用程序的可用性和性能。

开发AWS Shield Advanced批量添加ALB防护的好处

  1. 简化管理:批量添加ALB防护功能可以简化管理过程,一次性为多个ALB启用防护,节省时间和精力,提高管理效率。
  2. 统一防护策略:通过批量添加ALB防护功能,用户可以为多个ALB应用相同的防护策略,确保一致的防护水平,并简化策略管理。
  3. 快速响应:批量添加ALB防护功能可以快速响应潜在的DDoS攻击。检测到攻击时,用户可以立即为所有受影响的ALB启用防护,降低攻击造成的损失。

前提条件

确保已在us-east-1安装区域安装了AWS Shield Advanced功能,并提前设置好本地开发环境的Access Key和Secret Key。

实现方式

以下是使用Python和Boto3库实现的示例代码:

## 开发环境

    Python 3.8.2

## 安装依赖包

    pip install boto3==1.26.105

## 用户权限

用户所需的权限,如下所示

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "wafv2:ListResourcesForWebACL",
                "cloudwatch:PutMetricAlarm",
                "shield:AssociateHealthCheck",
                "elasticloadbalancing:DescribeLoadBalancers",
                "wafv2:ListWebACLs",
                "wafv2:AssociateWebACL",
                "route53:CreateHealthCheck",
                "shield:CreateProtection",
                "shield:EnableApplicationLayerAutomaticResponse",
                "shield:ListProtections",
                "apigateway:SetWebACL",
                "apigateway:GET",
                "iam:GetRole",
                "elasticloadbalancing:SetWebACL"
            ],
            "Resource": "*"
        }
    ]
}
```

## Shield批量添加ALB防护

主要是对外部提供服务的ALB添加Shield,新增保护返回状态200,空白说明已经受保护

```python
import boto3  # 导入boto3库,用于与AWS服务进行交互
from datetime import datetime  # 导入datetime模块,用于获取当前时间

def get_load_balancer_arns():
    elbv2 = boto3.client('elbv2', region_name='us-east-1')  # 创建elbv2客户端对象
    response = elbv2.describe_load_balancers()  # 调用describe_load_balancers方法获取负载均衡器信息
    load_balancer_arns = [lb['LoadBalancerArn'] for lb in response['LoadBalancers'] if
                          lb['Type'] == 'application' and not lb['Scheme'] == 'internal']  # 提取满足条件的负载均衡器ARN
    return load_balancer_arns

def create_cloudwatch_alarm(alarm_name, load_balancer_name):
    cloudwatch = boto3.client('cloudwatch', region_name='us-east-1')  # 创建cloudwatch客户端对象
    response = cloudwatch.put_metric_alarm(
        AlarmName=alarm_name,
        MetricName='TargetResponseTime',
        Namespace='AWS/ApplicationELB',
        Statistic='Average',
        Dimensions=[
            {
                'Name': 'LoadBalancer',
                'Value': load_balancer_name
            },
        ],
        Period=60,
        EvaluationPeriods=20,
        DatapointsToAlarm=1,
        Threshold=5.0,
        ComparisonOperator='GreaterThanOrEqualToThreshold',
        TreatMissingData='notBreaching',  # notBreaching缺失数据不告警;missing缺失数据告警
        Unit='Seconds'
    )  # 调用put_metric_alarm方法创建云监控告警

def create_route53_health_check(alarm_name):
    route53 = boto3.client('route53')  # 创建route53客户端对象
    current_time = datetime.now().strftime("%Y-%m-%d-%H:%M:%S")  # 获取当前时间
    response = route53.create_health_check(
        CallerReference=f'{current_time}',
        HealthCheckConfig={
            'Type': 'CLOUDWATCH_METRIC',
            'Inverted': False,
            'Disabled': False,
            'AlarmIdentifier': {
                'Region': 'us-east-1',
                'Name': f'{alarm_name}'
            },
            'InsufficientDataHealthStatus': 'LastKnownStatus'
        }
    )  # 调用create_health_check方法创建Route53健康检查
    health_check_arn = response["HealthCheck"]["Id"]  # 提取健康检查的ARN
    return health_check_arn

def is_alb_protected(alb_arn):
    shield = boto3.client('shield')  # 创建shield客户端对象
    response = shield.list_protections()  # 调用list_protections方法获取防护资源信息
    protected_resources = [protection['ResourceArn'] for protection in response['Protections']]  # 提取防护资源的ARN
    return alb_arn in protected_resources  # 判断负载均衡器是否在防护资源中

def create_shield_protection(alb_arn, protection_name):
    shield = boto3.client('shield')  # 创建shield客户端对象
    response = shield.create_protection(
        Name=protection_name,
        ResourceArn=alb_arn
    )  # 调用create_protection方法创建Shield防护
    return response

def protect_alb(alb_arn):
    protection_name = alb_arn.split("/")[-1]  # 提取负载均衡器名称
    response = create_shield_protection(alb_arn, protection_name)  # 调用create_shield_protection方法创建Shield防护
    protection_id = response['ProtectionId']  # 提取防护ID
    return protection_id

def associate_health_check(health_check_arn, protection_id):
    shield = boto3.client('shield')  # 创建shield客户端对象
    response = shield.associate_health_check(
        ProtectionId=protection_id,
        HealthCheckArn=health_check_arn
    )  # 调用associate_health_check方法关联健康检查和防护
    return response

def enable_shield_automatic_response(resource_arn, action):
    shield = boto3.client('shield')  # 创建shield客户端对象
    response = shield.enable_application_layer_automatic_response(ResourceArn=resource_arn, Action={f'{action}': {}})
    return response

# 使用示例
# 检查ALB是否受到保护
load_balancer_arns = get_load_balancer_arns()
for alb_arn in load_balancer_arns:
    alarm_name = f'TargetResponseTime-{alb_arn.split("/")[-1]}'  # 根据ARN生成告警名称
    load_balancer_name = '/'.join(alb_arn.split("/")[+1:])  # 提取负载均衡器名称
    is_protected = is_alb_protected(alb_arn)  # 判断负载均衡器是否受到保护
    if not is_protected:
        create_cloudwatch_alarm(alarm_name, load_balancer_name)  # 创建云监控告警
        health_check_id = create_route53_health_check(alarm_name)  # 创建Route53健康检查
        protection_id = protect_alb(alb_arn)  # 创建Shield防护
        health_check_arn = "arn:aws:route53:::healthcheck/" + health_check_id  # 构建健康检查的ARN
        response = associate_health_check(health_check_arn, protection_id)  # 关联健康检查和防护
        action = 'Count'
        response = enable_shield_automatic_response(alb_arn, action)  # 启用Shield自动响应
        print(response["ResponseMetadata"]["HTTPStatusCode"])  # 打印响应状态码
```

## Shield批量移除ALB防护

以下详解权限没有单独验证,笔者是使用管理员实现,移除返回状态200,空白说明没有可移除

```python
import boto3  # 导入boto3库

def get_load_balancer_arns():  # 定义获取负载均衡器ARN的函数
    elbv2 = boto3.client('elbv2', region_name='us-east-1')  # 创建elbv2客户端对象
    response = elbv2.describe_load_balancers()  # 调用describe_load_balancers方法获取负载均衡器信息
    load_balancer_arns = [lb['LoadBalancerArn'] for lb in response['LoadBalancers'] if
                          lb['Type'] == 'application' and not lb['Scheme'] == 'internal']  # 提取满足条件的负载均衡器ARN
    return load_balancer_arns  # 返回负载均衡器ARN列表

def describe_shield_protection(resource_arn):  # 定义描述Shield保护的函数
    shield = boto3.client('shield')  # 创建shield客户端对象
    response = shield.describe_protection(
        ResourceArn=resource_arn
    )  # 调用describe_protection方法获取资源的Shield保护信息
    return response  # 返回Shield保护信息

def delete_protection(protection_id):  # 定义删除Shield保护的函数
    shield = boto3.client('shield')  # 创建shield客户端对象
    response = shield.delete_protection(ProtectionId=f'{protection_id}')  # 调用delete_protection方法删除指定的Shield保护
    return response  # 返回删除结果

def delete_health_check(health_check_id):  # 定义删除健康检查的函数
    route53 = boto3.client('route53')  # 创建route53客户端对象
    response = route53.delete_health_check(
        HealthCheckId=health_check_id
    )  # 调用delete_health_check方法删除指定的健康检查
    return response  # 返回删除结果

def delete_alarms(alarm_name):  # 定义删除告警的函数
    cloudwatch = boto3.client('cloudwatch')  # 创建cloudwatch客户端对象
    response = cloudwatch.delete_alarms(
        AlarmNames=[
            f'{alarm_name}',
        ]
    )  # 调用delete_alarms方法删除指定的告警
    return response  # 返回删除结果

def is_alb_protected(alb_arn):  # 定义判断负载均衡器是否受到Shield保护的函数
    shield = boto3.client('shield')  # 创建shield客户端对象
    response = shield.list_protections()  # 调用list_protections方法获取所有的Shield保护信息
    protected_resources = [protection['ResourceArn'] for protection in response['Protections']]  # 提取所有受到Shield保护的资源ARN
    return alb_arn in protected_resources  # 返回负载均衡器ARN是否在受到Shield保护的资源列表中

# 使用示例
load_balancer_arns = get_load_balancer_arns()  # 获取负载均衡器ARN列表
for resource_arn in load_balancer_arns:  # 遍历负载均衡器ARN列表
    is_protected = is_alb_protected(resource_arn)  # 判断负载均衡器是否受到Shield保护
    if is_protected:  # 如果受到保护
        alarm_name = f'TargetResponseTime-{resource_arn.split("/")[-1]}'  # 根据ARN生成告警名称
        response = describe_shield_protection(resource_arn)  # 获取Shield保护信息
        protection_id = response['Protection']['Id']  # 获取Shield保护ID
        health_check_id = response['Protection']['HealthCheckIds'][0]  # 获取健康检查ID
        delete_protection(protection_id)  # 删除Shield保护
        delete_health_check(health_check_id)  # 删除健康检查
        delete_alarms(alarm_name)  # 删除告警
```

结论

AWS Shield Advanced提供了强大的DDoS保护功能,而通过开发批量管理ALB防护的功能,可以进一步简化管理过程,提供快速响应和统一的防护策略。这些功能和好处使得AWS Shield Advanced成为保护AWS基础架构免受DDoS攻击的重要工具。

安全和运维经验为本文提供了深刻的见解,希望读者能够根据文中的指导,更好地利用AWS Shield Advanced保护其应用程序。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月19日 0

暂无评论

推荐阅读
ndCIWJ3N86Az