在容器化应用的开发过程中,Prisma Cloud为我们的容器环境提供了全面的安全防护。然而,为了更好地满足开发和测试的需求,有时我们可能需要暂时解除Prisma Cloud的保护。本文将深入探讨AWS ECS Fargate移除Prisma Cloud防护的必要性,并通过详细的代码讲解实现这一过程。
为何解除Prisma Cloud防护?
1. 灵活性与效率
在应用的开发和测试阶段,我们需要更大的灵活性来调整和测试容器。Prisma Cloud的强大保护功能可能会在一些情况下显得过于严格,影响我们对容器的快速迭代。
2. 性能调优需求
一些性能敏感的场景下,Prisma Cloud的额外检测和防护操作可能对应用性能产生一定的影响。临时解除防护有助于更好地进行性能调优和测试。
3. 容器快速迭代
容器化应用的快速迭代是当今开发的一个主要趋势。在这种情况下,我们需要频繁地修改和测试容器,而解除Prisma Cloud的保护可以帮助我们更顺畅地进行这一过程。
AWS ECS Fargate 移除 Prisma Cloud 的实现
导出并修改任务定义
对于受保护的服务,我们导出其当前任务定义,并通过删除一些字段和更新容器定义,生成新的任务定义,解除Prisma Cloud的保护。以下是导出和修改任务定义的代码:
import boto3
def back_export_ecs_task(cluster_name, service_name):
ecs = boto3.client('ecs')
# Get the current task definition ARN
response = ecs.describe_services(cluster=cluster_name, services=[service_name])
task_definition_arn = response['services'][0]['taskDefinition']
# Export the current task definition
response = ecs.describe_task_definition(taskDefinition=task_definition_arn)
task_definition = response['taskDefinition']
# Delete invalid fields from dictionary
for field in ['taskDefinitionArn','revision','volumes','status','requiresAttributes','placementConstraints','compatibilities','registeredAt','registeredBy']:
del task_definition[f'{field}']
for container in task_definition['containerDefinitions']:
if container['name'].__contains__(service_name):
container['entryPoint'] = ['sh', '-c']
container['environment'] = []
container['volumesFrom'] = []
del task_definition['containerDefinitions'][0]['linuxParameters']
del task_definition['containerDefinitions'][0]['dependsOn']
task_definition['containerDefinitions'] = [d for d in task_definition['containerDefinitions'] if d['name'] != 'TwistlockDefender']
return task_definition
注册新的任务定义并更新服务
将新的任务定义注册到ECS,并更新相应的服务,使其使用新的任务定义。这样,服务将从Prisma Cloud的防护中解脱出来。以下是注册和更新服务的代码:
def update_ecs(cluster_name, service_name, task_definition):
ecs = boto3.client('ecs')
response = ecs.register_task_definition(**task_definition)
task_definition_arn = response['taskDefinition']['taskDefinitionArn']
# Update the service to use the new task definition
response = ecs.update_service(cluster=cluster_name, service=service_name, taskDefinition=task_definition_arn)
print(f"{service_name} detach twistlock_defender success")
判断 Twistlock Defender 是否已存在
在移除之前,通过检查任务定义中是否已经存在 Twistlock Defender 容器来确保不存在不处理。
def judge_twistlock_defender(cluster_name, service_name, region_name=None):
# Create ECS client
ecs_client = boto3.client('ecs', region_name=region_name)
# Get the service's task definition
response = ecs_client.describe_services(cluster=cluster_name, services=[service_name])
task_definition_arn = response['services'][0]['taskDefinition']
# Get the task definition's detailed information
response = ecs_client.describe_task_definition(taskDefinition=task_definition_arn)
task_definition = response['taskDefinition']
# Check if TwistlockDefender container exists in the task definition
containers = task_definition['containerDefinitions']
twistlock_defender_found = any(container['name'] == 'TwistlockDefender' for container in containers)
return twistlock_defender_found
获取 ECS 服务列表
通过 Boto3 获取指定 ECS 集群中的所有服务的列表,以便进行批量处理。
def get_service_names(cluster_name, region_name=None):
# Create ECS client
ecs_client = boto3.client('ecs', region_name=region_name)
# Paginate to get service names
next_token = None
service_names = []
while True:
# Get one page of service information
if next_token:
response = ecs_client.list_services(cluster=cluster_name, nextToken=next_token)
else:
response = ecs_client.list_services(cluster=cluster_name)
services = response['serviceArns']
# Filter service names
for service in services:
service_name = service.split('/')[-1]
service_names.append(service_name)
# Check if there is a next page
if 'nextToken' in response:
next_token = response['nextToken']
else:
break
return service_names
主函数
def main():
cluster_name = 'test' #输入ecs集群名称,也可以批量获取
service_names = get_service_names(cluster_name)
for service_name in service_names:
value = judge_twistlock_defender(cluster_name, service_name, region_name=None)
if value:
task = back_export_ecs_task(cluster_name, service_name)
try:
update_ecs(cluster_name, service_name, task)
except Exception as e:
print("service_name error", service_name, e)
main()
结论
AWS ECS Fargate 移除 Prisma Cloud 防护为开发人员提供了更大的灵活性,使其能够更轻松地进行应用开发和调试工作。在执行这一过程之前,请务必在受控环境中进行充分测试,以确保不会引入潜在的安全风险。通过平衡安全性和灵活性,我们能够更好地适应不同阶段和需求下的应用开发和测试工作。