django+celery如何實(shí)現(xiàn)定時(shí)拉取阿里云rocketmq實(shí)例信息
一、項(xiàng)目初始化
1. 創(chuàng)建虛擬環(huán)境并安裝依賴
# 創(chuàng)建虛擬環(huán)境 python3 -m venv env source env/bin/activate # 安裝依賴 pip install django celery redis django-celery-beat aliyun-python-sdk-core-v3 aliyun-python-sdk-mq mysqlclient
2. 創(chuàng)建 Django 項(xiàng)目和應(yīng)用
# 創(chuàng)建項(xiàng)目 django-admin startproject rocketmq_manager cd rocketmq_manager # 創(chuàng)建應(yīng)用 python manage.py startapp rocketmq
3. 配置 MySQL 數(shù)據(jù)庫(rocketmq_manager/settings.py)
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'rocketmq_manager', # 數(shù)據(jù)庫名
'USER': 'your_username', # 用戶名
'PASSWORD': 'your_password', # 密碼
'HOST': 'localhost', # 主機(jī)
'PORT': '3306', # 端口
'OPTIONS': {
'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
},
}
}
4. 配置項(xiàng)目其他設(shè)置(rocketmq_manager/settings.py)
INSTALLED_APPS = [
# ...
'django_celery_beat',
'django_celery_results',
'rocketmq',
]
# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
# 阿里云配置(從環(huán)境變量獲取)
ALIYUN_ACCESS_KEY_ID = os.environ.get('ALIYUN_ACCESS_KEY_ID')
ALIYUN_ACCESS_KEY_SECRET = os.environ.get('ALIYUN_ACCESS_KEY_SECRET')
ALIYUN_REGION_ID = os.environ.get('ALIYUN_REGION_ID', 'cn-hangzhou')
二、Celery 集成配置
1. 創(chuàng)建 Celery 應(yīng)用(rocketmq_manager/celery.py)
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'rocketmq_manager.settings')
app = Celery('rocketmq_manager')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
2. 初始化 Celery(rocketmq_manager/__init__.py)
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)
三、Model 開發(fā)
創(chuàng)建 RocketMQ 實(shí)例模型(rocketmq/models.py)
python
運(yùn)行
from django.db import models
from django.utils import timezone
class RocketMQInstance(models.Model):
instance_id = models.CharField('實(shí)例ID', max_length=100, unique=True)
instance_name = models.CharField('實(shí)例名稱', max_length=200, blank=True, null=True)
instance_type = models.CharField('實(shí)例類型', max_length=50, blank=True, null=True)
region_id = models.CharField('區(qū)域ID', max_length=50)
status = models.CharField('狀態(tài)', max_length=50, blank=True, null=True)
create_time = models.DateTimeField('創(chuàng)建時(shí)間', blank=True, null=True)
expire_time = models.DateTimeField('過期時(shí)間', blank=True, null=True)
tags = models.JSONField('標(biāo)簽', blank=True, null=True)
last_updated = models.DateTimeField('最后更新時(shí)間', auto_now=True)
def __str__(self):
return f"{self.instance_name} ({self.instance_id})"
class Meta:
verbose_name = 'RocketMQ實(shí)例'
verbose_name_plural = 'RocketMQ實(shí)例列表'
indexes = [
models.Index(fields=['instance_id', 'region_id']),
]
class InstanceSyncLog(models.Model):
sync_time = models.DateTimeField('同步時(shí)間', auto_now_add=True)
instance_count = models.IntegerField('實(shí)例數(shù)量', default=0)
success = models.BooleanField('是否成功', default=True)
error_message = models.TextField('錯誤信息', blank=True, null=True)
execution_time = models.FloatField('執(zhí)行時(shí)間(秒)', blank=True, null=True)
def __str__(self):
return f"同步記錄 - {self.sync_time}"
class Meta:
verbose_name = '實(shí)例同步日志'
verbose_name_plural = '實(shí)例同步日志列表'
ordering = ['-sync_time']
遷移數(shù)據(jù)庫
python manage.py makemigrations python manage.py migrate
四、定時(shí)任務(wù)代碼
創(chuàng)建阿里云 API 客戶端(rocketmq/aliyun_client.py)
import os
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkmq.model.v20190513 import DescribeInstancesRequest
import json
import time
class AliyunRocketMQClient:
def __init__(self):
self.access_key_id = os.environ.get('ALIYUN_ACCESS_KEY_ID')
self.access_key_secret = os.environ.get('ALIYUN_ACCESS_KEY_SECRET')
self.region_id = os.environ.get('ALIYUN_REGION_ID', 'cn-hangzhou')
self.client = AcsClient(self.access_key_id, self.access_key_secret, self.region_id)
def get_instances(self):
try:
request = DescribeInstancesRequest.DescribeInstancesRequest()
request.set_accept_format('json')
# 添加重試機(jī)制
max_retries = 3
for attempt in range(max_retries):
try:
response = self.client.do_action_with_exception(request)
return json.loads(response)
except (ClientException, ServerException) as e:
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 2
print(f"請求失敗,{wait_time}秒后重試: {str(e)}")
time.sleep(wait_time)
else:
raise
except Exception as e:
print(f"獲取實(shí)例信息失敗: {str(e)}")
raise
定義定時(shí)任務(wù)(rocketmq/tasks.py)
from celery import shared_task
from .models import RocketMQInstance, InstanceSyncLog
from .aliyun_client import AliyunRocketMQClient
import logging
from datetime import datetime
import time
logger = logging.getLogger(__name__)
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={'max_retries': 3})
def sync_rocketmq_instances(self):
start_time = time.time()
try:
client = AliyunRocketMQClient()
response = client.get_instances()
# 處理響應(yīng)數(shù)據(jù)
instance_list = []
if 'Data' in response and 'InstanceDoList' in response['Data']:
for item in response['Data']['InstanceDoList']:
instance = {
'instance_id': item.get('InstanceId', ''),
'instance_name': item.get('InstanceName', ''),
'instance_type': item.get('InstanceType', ''),
'region_id': item.get('RegionId', ''),
'status': item.get('InstanceStatus', ''),
'create_time': datetime.fromtimestamp(item.get('CreateTime', 0) / 1000) if item.get('CreateTime') else None,
'expire_time': datetime.fromtimestamp(item.get('ExpireTime', 0) / 1000) if item.get('ExpireTime') else None,
'tags': item.get('Tags', {})
}
instance_list.append(instance)
# 使用事務(wù)批量更新數(shù)據(jù)庫
from django.db import transaction
with transaction.atomic():
# 先刪除不存在的實(shí)例(可選)
# existing_ids = [item['instance_id'] for item in instance_list]
# RocketMQInstance.objects.exclude(instance_id__in=existing_ids).delete()
# 批量更新或創(chuàng)建實(shí)例
for instance_data in instance_list:
RocketMQInstance.objects.update_or_create(
instance_id=instance_data['instance_id'],
defaults=instance_data
)
execution_time = time.time() - start_time
# 記錄同步日志
log = InstanceSyncLog.objects.create(
instance_count=len(instance_list),
success=True,
execution_time=execution_time
)
logger.info(f"成功同步 {len(instance_list)} 個(gè)RocketMQ實(shí)例,耗時(shí): {execution_time:.2f}秒")
return f"同步完成,共 {len(instance_list)} 個(gè)實(shí)例,耗時(shí): {execution_time:.2f}秒"
except Exception as e:
execution_time = time.time() - start_time
# 記錄錯誤日志
InstanceSyncLog.objects.create(
success=False,
error_message=str(e),
execution_time=execution_time
)
logger.error(f"同步RocketMQ實(shí)例失敗: {str(e)},耗時(shí): {execution_time:.2f}秒")
raise
五、接口開發(fā)
1. 創(chuàng)建序列化器(rocketmq/serializers.py)
from rest_framework import serializers
from .models import RocketMQInstance, InstanceSyncLog
class RocketMQInstanceSerializer(serializers.ModelSerializer):
class Meta:
model = RocketMQInstance
fields = '__all__'
read_only_fields = ['last_updated']
class InstanceSyncLogSerializer(serializers.ModelSerializer):
class Meta:
model = InstanceSyncLog
fields = '__all__'
read_only_fields = ['sync_time', 'instance_count', 'success', 'error_message', 'execution_time']
2. 創(chuàng)建視圖集(rocketmq/views.py)
from rest_framework import viewsets, status
from rest_framework.response import Response
from .models import RocketMQInstance, InstanceSyncLog
from .serializers import RocketMQInstanceSerializer, InstanceSyncLogSerializer
from .tasks import sync_rocketmq_instances
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.authentication import TokenAuthentication
class RocketMQInstanceViewSet(viewsets.ModelViewSet):
queryset = RocketMQInstance.objects.all()
serializer_class = RocketMQInstanceSerializer
authentication_classes = [TokenAuthentication]
permission_classes = [IsAuthenticated]
@action(detail=False, methods=['post'])
def sync_now(self, request):
"""立即觸發(fā)實(shí)例同步"""
task = sync_rocketmq_instances.delay()
return Response({'task_id': task.id, 'message': '同步任務(wù)已啟動'}, status=status.HTTP_202_ACCEPTED)
@action(detail=False, methods=['get'])
def regions(self, request):
"""獲取所有區(qū)域列表"""
regions = RocketMQInstance.objects.values_list('region_id', flat=True).distinct()
return Response(regions, status=status.HTTP_200_OK)
class InstanceSyncLogViewSet(viewsets.ReadOnlyModelViewSet):
queryset = InstanceSyncLog.objects.all().order_by('-sync_time')
serializer_class = InstanceSyncLogSerializer
authentication_classes = [TokenAuthentication]
permission_classes = [IsAuthenticated]
3. 配置 URL(rocketmq/urls.py)
from django.urls import include, path
from rest_framework import routers
from .views import RocketMQInstanceViewSet, InstanceSyncLogViewSet
router = routers.DefaultRouter()
router.register(r'instances', RocketMQInstanceViewSet)
router.register(r'sync-logs', InstanceSyncLogViewSet)
urlpatterns = [
path('', include(router.urls)),
]
4. 項(xiàng)目 URL 配置(rocketmq_manager/urls.py)
from django.contrib import admin
from django.urls import path, include
from rest_framework.authtoken.views import obtain_auth_token
urlpatterns = [
path('admin/', admin.site.urls),
path('api/', include('rocketmq.urls')),
path('api/token/', obtain_auth_token, name='api_token_auth'), # 獲取認(rèn)證令牌
]
六、配置定時(shí)任務(wù)
在settings.py中添加定時(shí)任務(wù)配置
CELERY_BEAT_SCHEDULE = {
'sync-rocketmq-instances': {
'task': 'rocketmq.tasks.sync_rocketmq_instances',
'schedule': 3600.0, # 每小時(shí)執(zhí)行一次
'args': ()
},
}
七、啟動服務(wù)
1. 設(shè)置環(huán)境變量
export ALIYUN_ACCESS_KEY_ID=your_access_key_id export ALIYUN_ACCESS_KEY_SECRET=your_access_key_secret export ALIYUN_REGION_ID=cn-hangzhou # 根據(jù)實(shí)際情況修改
2. 啟動 Redis
redis-server
3. 啟動 Celery Worker
celery -A rocketmq_manager worker --loglevel=info --pool=prefork --concurrency=4
4. 啟動 Celery Beat
celery -A rocketmq_manager beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
5. 啟動 Django 開發(fā)服務(wù)器
python manage.py runserver
八、API 測試
1. 獲取認(rèn)證令牌
curl -X POST -d "username=your_username&password=your_password" http://localhost:8000/api/token/
2. 獲取 RocketMQ 實(shí)例列表
curl -H "Authorization: Token your_token_here" http://localhost:8000/api/instances/
3. 獲取同步日志
curl -H "Authorization: Token your_token_here" http://localhost:8000/api/sync-logs/
4. 手動觸發(fā)同步
curl -X POST -H "Authorization: Token your_token_here" http://localhost:8000/api/instances/sync_now/
項(xiàng)目結(jié)構(gòu)
rocketmq_manager/ ├── rocketmq_manager/ │ ├── __init__.py │ ├── celery.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py ├── rocketmq/ │ ├── migrations/ │ ├── __init__.py │ ├── admin.py │ ├── apps.py │ ├── aliyun_client.py │ ├── models.py │ ├── serializers.py │ ├── tasks.py │ ├── urls.py │ └── views.py ├── manage.py └── db.sqlite3
關(guān)鍵特性說明
- MySQL 存儲:使用 MySQL 數(shù)據(jù)庫存儲 RocketMQ 實(shí)例信息和同步日志
- 定時(shí)同步:每小時(shí)自動拉取阿里云 RocketMQ 實(shí)例信息
- 數(shù)據(jù)持久化:將實(shí)例信息存儲到數(shù)據(jù)庫,支持索引加速查詢
- 手動觸發(fā):提供 API 接口支持手動觸發(fā)同步
- 錯誤處理:任務(wù)失敗自動重試,記錄詳細(xì)的同步日志和執(zhí)行時(shí)間
- 權(quán)限控制:使用 Token 認(rèn)證保護(hù) API 接口
擴(kuò)展建議
- 添加更多阿里云 API 調(diào)用,獲取更詳細(xì)的實(shí)例指標(biāo)(如 TPS、消息堆積量等)
- 實(shí)現(xiàn)多區(qū)域支持,同時(shí)監(jiān)控多個(gè)地域的 RocketMQ 實(shí)例
- 添加告警機(jī)制,當(dāng)實(shí)例狀態(tài)異?;蛲绞r(shí)發(fā)送通知
- 集成緩存系統(tǒng)(如 Redis),提高接口響應(yīng)速度
- 添加 API 限流功能,防止惡意請求
- 實(shí)現(xiàn)實(shí)例信息的導(dǎo)出功能,支持?jǐn)?shù)據(jù)報(bào)表生成
這個(gè)實(shí)現(xiàn)提供了一個(gè)完整的 Django+Celery 定時(shí)拉取阿里云 RocketMQ 實(shí)例信息的解決方案,使用 MySQL 存儲數(shù)據(jù),支持權(quán)限控制和手動觸發(fā)同步,可直接用于生產(chǎn)環(huán)境。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python深度學(xué)習(xí)pytorch神經(jīng)網(wǎng)絡(luò)多層感知機(jī)簡潔實(shí)現(xiàn)
這篇文章主要為大家講解了Python深層學(xué)習(xí)中pytorch神經(jīng)網(wǎng)絡(luò)多層感知機(jī)的簡潔實(shí)現(xiàn)方式,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-10-10
python Django里CSRF 對應(yīng)策略詳解
這篇文章主要介紹了python Django里CSRF 對應(yīng)策略詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08
mac下給python3安裝requests庫和scrapy庫的實(shí)例
今天小編就為大家分享一篇mac下給python3安裝requests庫和scrapy庫的實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-06-06
Python辦公自動化之定時(shí)郵件提醒和音視頻文件處理
這篇文章主要為大家詳細(xì)介紹了Python辦公自動化中定時(shí)郵件提醒和音視頻文件處理的相關(guān)知識,文中的示例代碼講解詳細(xì),需要的小伙伴可以了解下2023-12-12
一文詳解如何實(shí)現(xiàn)PyTorch模型編譯
這篇文章主要為大家介紹了如何實(shí)現(xiàn)PyTorch?模型編譯詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04
Python函數(shù)返回多個(gè)值的多種方法小結(jié)
在Python中,函數(shù)通常用于封裝一段代碼,使其可以重復(fù)調(diào)用,有時(shí),我們希望一個(gè)函數(shù)能夠返回多個(gè)值,Python提供了幾種不同的方法來實(shí)現(xiàn)這一點(diǎn),需要的朋友可以參考下2025-05-05

