1. 写在前面
在实际项目开发过程中,有时需要考虑数据库或表大小,以避免如:日志记录等数据大量填充,导致数据库臃肿。本文以 PostgreSQL 数据库为例,简单演示在 Django 中如何监控数据库大小及自动清理数据;
公众号: 滑翔的纸飞机
2. PostgreSQL 命令
-
进入 PostgreSQL 终端
在命令行中,键入以下命令,对应替换 dbname 、username:
psql dbname username
示例输出:
psql testdb jpz psql (10.16 (Debian 10.16-1.pgdg90+1)) Type "help" for help. testdb=#
备注: 若提示"password",则输入用户对应密码
-
确定数据库的大小,同理替换对应 db name: 命令:
SELECT pg_size_pretty( pg_database_size('dbname') );
示例输出:
testdb=# SELECT pg_size_pretty( pg_database_size('testdb') ); pg_size_pretty ---------------- 9199 kB (1 row)
-
确定当前数据库中表的大小,同理替换对应 table name: 命令:
SELECT pg_size_pretty( pg_total_relation_size('tablename') );
示例输出:
attack=# SELECT pg_size_pretty( pg_total_relation_size('django_migrations') ); pg_size_pretty ---------------- 32 kB (1 row)
3. Django 示例
场景:每隔5 秒(便于实验),检查 PostgreSQL 数据库大小,当存储超过设定阈值,触发数据库数据自动删除事件,直到存储小于设定阈值。
3.1. Django 初始化
版本说明:
Python:3.11.2,Django:4.2.2,PostgreSQL:14.6,Redis:7.0.5-alpine,Celery:5.2.7
本文重点不在于介绍环境安装,因此相关环境提前准备,可以基于 Docker 快速部署一个 PostgreSQL 和 Redis, Python 、Django、Celery可以基于虚拟环境进行安装;
那我们开始吧!
进入已准备妥善并安装Django的虚拟环境;
-
初始化 Django 项目
命令:
django-admin startproject <mydb>
当前目录下,创建一个 mydb Django项目,mydb 为项目名称,可替换成其他名字;
启动 Django 内置开发服务验证:
命令:
cd myapp/ python manage.py runserver 0.0.0.0:8888
备注:没有特殊说明,均在 mydb 根目录下执行命令;
打开浏览器键入
http://IP:8888/
可看到 Django 经典页面,表示初始化动作已完成。若抛出类似错误:
Invalid HTTP_HOST header: '127.0.0.1:8888'. You may need to add '127.0.0.1' to ALLOWED_HOSTS.
修改:
vi myapp/settings.py
ALLOWED_HOSTS = [] => ALLOWED_HOSTS = ['*']
-
创建 App
命令:
django-admin startapp app1
编辑:
vi ./mydb/settings.py
修改INSTALLED_APPS
添加 app1,如:INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'app1', # 添加 app1 ]
-
Django 项目,增加 PostgreSQL 配置
编辑:
vi ./mydb/settings.py
增加 DB 配置:# PostgreSQL 配置 DATABASES = { "default": { "ENGINE": "django.db.backends.postgresql", "NAME": 'mydb', "USER": '***', "PASSWORD": '***', "HOST": '***', "PORT": '5432', } }
-
初始化数据库同步 命令:
python manage.py makemigrations python manage.py migrate
到此,简单的 Django 项目初始化配置完成,数据库 PostgreSQL;
3.2. Celery 定时任务
关于 Celery 介绍不再本文中叙述,后面单独介绍,这里重点介绍 DB 数据清理;
-
Settings 配置 Redis、Celery
编辑:settings.py
命令:
vi ./mydb/settings.py
,增加如下配置:# Redis 配置 BASE_REDIS = f"redis://:***@***:6379/" REDIS_DB = "0" REDIS_URL = BASE_REDIS + REDIS_DB # celery配置 CELERY_BROKER_URL = BASE_REDIS + "1" CELERY_RESULT_SERIALIZER = "json" CELERY_TASK_SERIALIZER = "json" CELERY_ACCEPT_CONTENT = ["json"] CELERY_IGNORE_RESULT = True CELERY_TASK_ROUTES = { "app1.scheduler_tasks.*": { "queue": "my_db_scheduler", "routing_key": "my_db_scheduler", }, } CELERY_IMPORTS = (task.replace(".*", "") for task in CELERY_TASK_ROUTES)
-
新建 Celery App
新建:
vi ./mydb/celery.py
, 添加如下内容:import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mydb.settings') app = Celery("mydb") app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks()
-
Celery 服务启动
Celery Worker: 命令:
celery -A mydb worker -l INFO -n my_db_scheduler -Q my_db_scheduler -P eventlet -c 5 --max-tasks-per-child 500
输出:
(py3.11-env)jpz@dev:~/workspace/mydb$ celery -A mydb worker -l INFO -n my_db_scheduler -Q my_db_scheduler -P eventlet -c 5 --max-tasks-per-child 500
-------------- celery@my_db_scheduler v5.2.7 (dawn-chorus) --- ***** ----- -- ******* ---- Linux-5.4.0-164-generic-x86_64-with-glibc2.31 2023-10-23 16:25:52
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: mydb:0x7f2fe417aa50
- ** ---------- .> transport: redis://:@*:6379/1
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 5 (eventlet) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> my_db_scheduler exchange=my_db_scheduler(direct) key=my_db_scheduler
celery -A mydb beat -l info --schedule celerybeat-schedule.db通过 eventlet 启动 5 个协程; **Celery Beat:** 命令:
(py3.11-env)jpz@dev:~/workspace/mydb$ celery -A mydb beat -l info --schedule celerybeat-schedule.db celery beat v5.2.7 (dawn-chorus) is starting. __ - ... __ - _ LocalTime -> 2023-10-23 16:22:08 Configuration -> . broker -> redis://:@*:6379/1 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule.db . logfile -> [stderr]@%INFO . maxinterval -> 5.00 minutes (300s) [2023-10-23 16:22:08,181: INFO/MainProcess] beat: Starting...输出:
3.3. PostgreSQL DB 自动清理
-
定义 Models
编辑:
vi ./mydb/app1/models.py
添加:from django.db import models class Student(models.Model): """ 学生 """ name = models.CharField(verbose_name="名字", max_length=50) age = models.IntegerField(verbose_name='年纪', default=0) create_time = models.DateTimeField(verbose_name='创建时间', auto_now_add=True, null=True) class Meta: verbose_name = '学生' ordering = ('-id',) # lastest() 获取模型中的最近的记录、earliest() 获取模型中的最早的记录,基于 get_latest_by 定义的字段 get_latest_by = 'create_time'
同步数据库:
python manage.py makemigrations python manage.py migrate
-
Redis 工具 编辑
vi ./mydb/utils/redis.py
添加:from redis import StrictRedis from django.conf import settings rs = StrictRedis.from_url(settings.REDIS_URL, decode_responses=True)
-
定时任务
结合 PostgreSQL 命令,通过Django执行原始Sql命令检查DB大小,超过设定阀值自动删除 Student 表数据,按照先建先删的原则,直到 DB 小于设定阀值;
设定阀值:
vi ./mydb/settings.py
# DB 大小上限 DB_MAX_LIMMIT = 0.009
定时任务:
vi ./mydb/app1/scheduler_tasks.py
import logging from celery import shared_task from django.conf import settings from django.db import connection from app1.models import Student from utils.redis import rs logger = logging.getLogger(__name__) def get_db_size() -> int: """ 查询 postgres db size,返回统一单位GB :return: """ size_gb = 0 try: with connection.cursor() as cursor: # 执行SQL cursor.execute(f"SELECT pg_size_pretty(pg_database_size('{settings.DATABASES['default']['NAME']}'))") re = cursor.fetchall()[0][0].split(' ') if re[1].upper() == 'KB': size_gb = int(re[0]) / 1024 / 1024 elif re[1].upper() == 'MB': size_gb = int(re[0]) / 1024 elif re[1].upper() == 'GB': size_gb = int(re[0]) elif re[1].upper() == 'TB': size_gb = int(re[0]) * 1024 logger.info(f'数据库:{settings.DATABASES["default"]["NAME"]},当前存储用量:{re} <=> {size_gb} GB') except Exception as e: logger.error(f'数据库:{settings.DATABASES["default"]["NAME"]},当前存储用量获取失败:{e}') return size_gb @shared_task def clean_attack_record(): """ 周期:每隔1分钟检查数据库表记录,大于阈值自动清理历史数据; """ logger.info(f'数据库:自动清理任务') db_size = get_db_size() db_tables = [Student, ] # 加锁,避免重叠执行 with rs.lock('clean:db:tables:record'): # 标记清理数据是否异常,异常则退出循坏 is_ok = 1 while db_size >= int(settings.DB_MAX_LIMMIT) and len(db_tables) != 0 and is_ok: for table in db_tables: logger.info(f'数据库:{settings.DATABASES["default"]["NAME"]},表:{table},开始执行自动清理任务') try: table.objects.earliest().delete() except (Student.DoesNotExist, ): logger.info(f'数据库:{settings.DATABASES["default"]["NAME"]},表:{table},没有记录') db_tables.remove(table) except Exception as e: is_ok = 0 logger.error(f'数据库:{settings.DATABASES["default"]["NAME"]},记录自动清理失败:{e}') break db_size = get_db_size()
Celery 添加定时任务:
编辑: `vi ./mydb/settings.py` ``` CELERY_BEAT_SCHEDULE = { "clean_attack_record": { "task": "mydb.scheduler_tasks.clean_attack_record", "schedule": 5, }, } ```
-
重启 Celery 任务
Celery Worker 命令: celery -A mydb worker -l INFO -n my_db_scheduler -Q my_db_scheduler -P eventlet -c 5 --max-tasks-per-child 500 Celery Beat 命令: celery -A mydb beat -l info --schedule celerybeat-schedule.db
Celery Worker 输出日志:
[2023-10-23 16:24:28,279: INFO/MainProcess] 数据库:自动清理任务 [2023-10-23 16:24:28,295: INFO/MainProcess] 数据库:mydb,当前存储用量:['9169', 'kB'] <=> 0.008744239807128906 GB [2023-10-23 16:24:28,297: INFO/MainProcess] 数据库:mydb,表:<class 'app1.models.Student'>,开始执行自动清理任务 [2023-10-23 16:24:28,299: INFO/MainProcess] 数据库:mydb,表:<class 'app1.models.Student'>,没有记录 [2023-10-23 16:24:28,301: INFO/MainProcess] 数据库:mydb,当前存储用量:['9169', 'kB'] <=> 0.008744239807128906 GB
Celery Beat 输出日志:
Configuration -> . broker -> redis://:**@***:6379/1 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule.db . logfile -> [stderr]@%INFO . maxinterval -> 5.00 minutes (300s) [2023-10-23 16:22:08,181: INFO/MainProcess] beat: Starting... [2023-10-23 16:22:08,275: INFO/MainProcess] Scheduler: Sending due task clean_attack_record (app1.scheduler_tasks.clean_attack_record) [2023-10-23 16:22:13,270: INFO/MainProcess] Scheduler: Sending due task clean_attack_record (app1.scheduler_tasks.clean_attack_record) [2023-10-23 16:22:18,270: INFO/MainProcess] Scheduler: Sending due task clean_attack_record (app1.scheduler_tasks.clean_attack_record)
3.4 代码
目录结构:
(py3.11-env) jpz@dev:~/workspace/mydb$ tree -L 2 ./
./
├── app1
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── migrations
│ ├── models.py
│ ├── scheduler_tasks.py
│ ├── tests.py
│ └── views.py
├── celerybeat-schedule.db
├── manage.py
├── mydb
│ ├── __init__.py
│ ├── asgi.py
│ ├── celery.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── utils
└── redis.py
settings.py:
"""
Django settings for mydb project.
Generated by 'django-admin startproject' using Django 4.2.2.
For more information on this file, see
https://docs.djangoproject.com/en/4.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/4.2/ref/settings/
"""
from pathlib import Path
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/4.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-$py*h=j54v4*%$b4+1=o-p(ov4pcv__==z!b%z3d#!=m43u#ai'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = ["*"]
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'app1',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'mydb.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'mydb.wsgi.application'
# Password validation
# https://docs.djangoproject.com/en/4.2/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/4.2/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/4.2/howto/static-files/
STATIC_URL = 'static/'
# Default primary key field type
# https://docs.djangoproject.com/en/4.2/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
# mydb 项目配置
# PostgreSQL 配置
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
"NAME": 'mydb',
"USER": '***',
"PASSWORD": '***',
"HOST": '***',
"PORT": '5432',
}
}
# Redis 配置
BASE_REDIS = f"redis://:***@***:6379/"
REDIS_DB = "0"
REDIS_URL = BASE_REDIS + REDIS_DB
# celery配置
CELERY_BROKER_URL = BASE_REDIS + "1"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TASK_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_IGNORE_RESULT = True
CELERY_TASK_ROUTES = {
"app1.scheduler_tasks.*": {
"queue": "my_db_scheduler",
"routing_key": "my_db_scheduler",
},
}
CELERY_IMPORTS = (task.replace(".*", "") for task in CELERY_TASK_ROUTES)
CELERY_BEAT_SCHEDULE = {
"clean_attack_record": {
"task": "app1.scheduler_tasks.clean_attack_record",
"schedule": 5,
},
}
# DB 大小上限
DB_MAX_LIMMIT = 0.009
celery.py:
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mydb.settings')
app = Celery("mydb")
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
redis.py
from redis import StrictRedis
from django.conf import settings
rs = StrictRedis.from_url(settings.REDIS_URL, decode_responses=True)
scheduler_tasks.py:
import logging
from celery import shared_task
from django.conf import settings
from django.db import connection
from app1.models import Student
from utils.redis import rs
logger = logging.getLogger(__name__)
def get_db_size() -> int:
"""
查询 postgres db size,返回统一单位GB
:return:
"""
size_gb = 0
try:
with connection.cursor() as cursor:
cursor.execute(f"SELECT pg_size_pretty(pg_database_size('{settings.DATABASES['default']['NAME']}'))")
re = cursor.fetchall()[0][0].split(' ')
if re[1].upper() == 'KB':
size_gb = int(re[0]) / 1024 / 1024
elif re[1].upper() == 'MB':
size_gb = int(re[0]) / 1024
elif re[1].upper() == 'GB':
size_gb = int(re[0])
elif re[1].upper() == 'TB':
size_gb = int(re[0]) * 1024
logger.info(f'数据库:{settings.DATABASES["default"]["NAME"]},当前存储用量:{re} <=> {size_gb} GB')
except Exception as e:
logger.error(f'数据库:{settings.DATABASES["default"]["NAME"]},当前存储用量获取失败:{e}')
return size_gb
@shared_task
def clean_attack_record():
"""
周期:每隔1分钟检查数据库表记录,大于阈值自动清理历史数据;
"""
logger.info(f'数据库:自动清理任务')
db_size = get_db_size()
db_tables = [Student, ]
with rs.lock('clean:db:tables:record'):
# 标记清理数据是否异常,异常则退出循坏
is_ok = 1
while db_size >= int(settings.DB_MAX_LIMMIT) and len(db_tables) != 0 and is_ok:
for table in db_tables:
logger.info(f'数据库:{settings.DATABASES["default"]["NAME"]},表:{table},开始执行自动清理任务')
try:
table.objects.earliest().delete()
except (Student.DoesNotExist, ):
logger.info(f'数据库:{settings.DATABASES["default"]["NAME"]},表:{table},没有记录')
db_tables.remove(table)
except Exception as e:
is_ok = 0
logger.error(f'数据库:{settings.DATABASES["default"]["NAME"]},记录自动清理失败:{e}')
break
db_size = get_db_size()
4.最后
本文基于 PostgreSQL 给出了一种基于 Django Celery 定时任务巡检 DB 大小,并清理数据,通过 Demo 形式说明;