欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

django+celery如何實(shí)現(xiàn)定時(shí)拉取阿里云rocketmq實(shí)例信息

 更新時(shí)間:2025年07月02日 09:46:17   作者:alden_ygq  
這篇文章主要介紹了django+celery如何實(shí)現(xiàn)定時(shí)拉取阿里云rocketmq實(shí)例信息,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

一、項(xiàng)目初始化

1. 創(chuàng)建虛擬環(huán)境并安裝依賴

# 創(chuàng)建虛擬環(huán)境
python3 -m venv env
source env/bin/activate

# 安裝依賴
pip install django celery redis django-celery-beat aliyun-python-sdk-core-v3 aliyun-python-sdk-mq mysqlclient

2. 創(chuàng)建 Django 項(xiàng)目和應(yīng)用

# 創(chuàng)建項(xiàng)目
django-admin startproject rocketmq_manager
cd rocketmq_manager

# 創(chuàng)建應(yīng)用
python manage.py startapp rocketmq

3. 配置 MySQL 數(shù)據(jù)庫(rocketmq_manager/settings.py)

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'rocketmq_manager',      # 數(shù)據(jù)庫名
        'USER': 'your_username',         # 用戶名
        'PASSWORD': 'your_password',     # 密碼
        'HOST': 'localhost',             # 主機(jī)
        'PORT': '3306',                  # 端口
        'OPTIONS': {
            'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
        },
    }
}

4. 配置項(xiàng)目其他設(shè)置(rocketmq_manager/settings.py)

INSTALLED_APPS = [
    # ...
    'django_celery_beat',
    'django_celery_results',
    'rocketmq',
]

# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'

# 阿里云配置(從環(huán)境變量獲取)
ALIYUN_ACCESS_KEY_ID = os.environ.get('ALIYUN_ACCESS_KEY_ID')
ALIYUN_ACCESS_KEY_SECRET = os.environ.get('ALIYUN_ACCESS_KEY_SECRET')
ALIYUN_REGION_ID = os.environ.get('ALIYUN_REGION_ID', 'cn-hangzhou')

二、Celery 集成配置

1. 創(chuàng)建 Celery 應(yīng)用(rocketmq_manager/celery.py)

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'rocketmq_manager.settings')

app = Celery('rocketmq_manager')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

2. 初始化 Celery(rocketmq_manager/__init__.py)

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ('celery_app',)

三、Model 開發(fā)

創(chuàng)建 RocketMQ 實(shí)例模型(rocketmq/models.py)

python

運(yùn)行

from django.db import models
from django.utils import timezone

class RocketMQInstance(models.Model):
    instance_id = models.CharField('實(shí)例ID', max_length=100, unique=True)
    instance_name = models.CharField('實(shí)例名稱', max_length=200, blank=True, null=True)
    instance_type = models.CharField('實(shí)例類型', max_length=50, blank=True, null=True)
    region_id = models.CharField('區(qū)域ID', max_length=50)
    status = models.CharField('狀態(tài)', max_length=50, blank=True, null=True)
    create_time = models.DateTimeField('創(chuàng)建時(shí)間', blank=True, null=True)
    expire_time = models.DateTimeField('過期時(shí)間', blank=True, null=True)
    tags = models.JSONField('標(biāo)簽', blank=True, null=True)
    last_updated = models.DateTimeField('最后更新時(shí)間', auto_now=True)
    
    def __str__(self):
        return f"{self.instance_name} ({self.instance_id})"
    
    class Meta:
        verbose_name = 'RocketMQ實(shí)例'
        verbose_name_plural = 'RocketMQ實(shí)例列表'
        indexes = [
            models.Index(fields=['instance_id', 'region_id']),
        ]

class InstanceSyncLog(models.Model):
    sync_time = models.DateTimeField('同步時(shí)間', auto_now_add=True)
    instance_count = models.IntegerField('實(shí)例數(shù)量', default=0)
    success = models.BooleanField('是否成功', default=True)
    error_message = models.TextField('錯誤信息', blank=True, null=True)
    execution_time = models.FloatField('執(zhí)行時(shí)間(秒)', blank=True, null=True)
    
    def __str__(self):
        return f"同步記錄 - {self.sync_time}"
    
    class Meta:
        verbose_name = '實(shí)例同步日志'
        verbose_name_plural = '實(shí)例同步日志列表'
        ordering = ['-sync_time']

遷移數(shù)據(jù)庫

python manage.py makemigrations
python manage.py migrate

四、定時(shí)任務(wù)代碼

創(chuàng)建阿里云 API 客戶端(rocketmq/aliyun_client.py)

import os
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkmq.model.v20190513 import DescribeInstancesRequest
import json
import time

class AliyunRocketMQClient:
    def __init__(self):
        self.access_key_id = os.environ.get('ALIYUN_ACCESS_KEY_ID')
        self.access_key_secret = os.environ.get('ALIYUN_ACCESS_KEY_SECRET')
        self.region_id = os.environ.get('ALIYUN_REGION_ID', 'cn-hangzhou')
        self.client = AcsClient(self.access_key_id, self.access_key_secret, self.region_id)
    
    def get_instances(self):
        try:
            request = DescribeInstancesRequest.DescribeInstancesRequest()
            request.set_accept_format('json')
            
            # 添加重試機(jī)制
            max_retries = 3
            for attempt in range(max_retries):
                try:
                    response = self.client.do_action_with_exception(request)
                    return json.loads(response)
                except (ClientException, ServerException) as e:
                    if attempt < max_retries - 1:
                        wait_time = (attempt + 1) * 2
                        print(f"請求失敗,{wait_time}秒后重試: {str(e)}")
                        time.sleep(wait_time)
                    else:
                        raise
            
        except Exception as e:
            print(f"獲取實(shí)例信息失敗: {str(e)}")
            raise

定義定時(shí)任務(wù)(rocketmq/tasks.py)

from celery import shared_task
from .models import RocketMQInstance, InstanceSyncLog
from .aliyun_client import AliyunRocketMQClient
import logging
from datetime import datetime
import time

logger = logging.getLogger(__name__)

@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={'max_retries': 3})
def sync_rocketmq_instances(self):
    start_time = time.time()
    try:
        client = AliyunRocketMQClient()
        response = client.get_instances()
        
        # 處理響應(yīng)數(shù)據(jù)
        instance_list = []
        if 'Data' in response and 'InstanceDoList' in response['Data']:
            for item in response['Data']['InstanceDoList']:
                instance = {
                    'instance_id': item.get('InstanceId', ''),
                    'instance_name': item.get('InstanceName', ''),
                    'instance_type': item.get('InstanceType', ''),
                    'region_id': item.get('RegionId', ''),
                    'status': item.get('InstanceStatus', ''),
                    'create_time': datetime.fromtimestamp(item.get('CreateTime', 0) / 1000) if item.get('CreateTime') else None,
                    'expire_time': datetime.fromtimestamp(item.get('ExpireTime', 0) / 1000) if item.get('ExpireTime') else None,
                    'tags': item.get('Tags', {})
                }
                instance_list.append(instance)
        
        # 使用事務(wù)批量更新數(shù)據(jù)庫
        from django.db import transaction
        with transaction.atomic():
            # 先刪除不存在的實(shí)例(可選)
            # existing_ids = [item['instance_id'] for item in instance_list]
            # RocketMQInstance.objects.exclude(instance_id__in=existing_ids).delete()
            
            # 批量更新或創(chuàng)建實(shí)例
            for instance_data in instance_list:
                RocketMQInstance.objects.update_or_create(
                    instance_id=instance_data['instance_id'],
                    defaults=instance_data
                )
        
        execution_time = time.time() - start_time
        
        # 記錄同步日志
        log = InstanceSyncLog.objects.create(
            instance_count=len(instance_list),
            success=True,
            execution_time=execution_time
        )
        
        logger.info(f"成功同步 {len(instance_list)} 個(gè)RocketMQ實(shí)例,耗時(shí): {execution_time:.2f}秒")
        return f"同步完成,共 {len(instance_list)} 個(gè)實(shí)例,耗時(shí): {execution_time:.2f}秒"
        
    except Exception as e:
        execution_time = time.time() - start_time
        
        # 記錄錯誤日志
        InstanceSyncLog.objects.create(
            success=False,
            error_message=str(e),
            execution_time=execution_time
        )
        
        logger.error(f"同步RocketMQ實(shí)例失敗: {str(e)},耗時(shí): {execution_time:.2f}秒")
        raise

五、接口開發(fā)

1. 創(chuàng)建序列化器(rocketmq/serializers.py)

from rest_framework import serializers
from .models import RocketMQInstance, InstanceSyncLog

class RocketMQInstanceSerializer(serializers.ModelSerializer):
    class Meta:
        model = RocketMQInstance
        fields = '__all__'
        read_only_fields = ['last_updated']

class InstanceSyncLogSerializer(serializers.ModelSerializer):
    class Meta:
        model = InstanceSyncLog
        fields = '__all__'
        read_only_fields = ['sync_time', 'instance_count', 'success', 'error_message', 'execution_time']

2. 創(chuàng)建視圖集(rocketmq/views.py)

from rest_framework import viewsets, status
from rest_framework.response import Response
from .models import RocketMQInstance, InstanceSyncLog
from .serializers import RocketMQInstanceSerializer, InstanceSyncLogSerializer
from .tasks import sync_rocketmq_instances
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.authentication import TokenAuthentication

class RocketMQInstanceViewSet(viewsets.ModelViewSet):
    queryset = RocketMQInstance.objects.all()
    serializer_class = RocketMQInstanceSerializer
    authentication_classes = [TokenAuthentication]
    permission_classes = [IsAuthenticated]
    
    @action(detail=False, methods=['post'])
    def sync_now(self, request):
        """立即觸發(fā)實(shí)例同步"""
        task = sync_rocketmq_instances.delay()
        return Response({'task_id': task.id, 'message': '同步任務(wù)已啟動'}, status=status.HTTP_202_ACCEPTED)
    
    @action(detail=False, methods=['get'])
    def regions(self, request):
        """獲取所有區(qū)域列表"""
        regions = RocketMQInstance.objects.values_list('region_id', flat=True).distinct()
        return Response(regions, status=status.HTTP_200_OK)

class InstanceSyncLogViewSet(viewsets.ReadOnlyModelViewSet):
    queryset = InstanceSyncLog.objects.all().order_by('-sync_time')
    serializer_class = InstanceSyncLogSerializer
    authentication_classes = [TokenAuthentication]
    permission_classes = [IsAuthenticated]

3. 配置 URL(rocketmq/urls.py)

from django.urls import include, path
from rest_framework import routers
from .views import RocketMQInstanceViewSet, InstanceSyncLogViewSet

router = routers.DefaultRouter()
router.register(r'instances', RocketMQInstanceViewSet)
router.register(r'sync-logs', InstanceSyncLogViewSet)

urlpatterns = [
    path('', include(router.urls)),
]

4. 項(xiàng)目 URL 配置(rocketmq_manager/urls.py)

from django.contrib import admin
from django.urls import path, include
from rest_framework.authtoken.views import obtain_auth_token

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/', include('rocketmq.urls')),
    path('api/token/', obtain_auth_token, name='api_token_auth'),  # 獲取認(rèn)證令牌
]

六、配置定時(shí)任務(wù)

在settings.py中添加定時(shí)任務(wù)配置

CELERY_BEAT_SCHEDULE = {
    'sync-rocketmq-instances': {
        'task': 'rocketmq.tasks.sync_rocketmq_instances',
        'schedule': 3600.0,  # 每小時(shí)執(zhí)行一次
        'args': ()
    },
}

七、啟動服務(wù)

1. 設(shè)置環(huán)境變量

export ALIYUN_ACCESS_KEY_ID=your_access_key_id
export ALIYUN_ACCESS_KEY_SECRET=your_access_key_secret
export ALIYUN_REGION_ID=cn-hangzhou  # 根據(jù)實(shí)際情況修改

2. 啟動 Redis

redis-server

3. 啟動 Celery Worker

celery -A rocketmq_manager worker --loglevel=info --pool=prefork --concurrency=4

4. 啟動 Celery Beat

celery -A rocketmq_manager beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler

5. 啟動 Django 開發(fā)服務(wù)器

python manage.py runserver

八、API 測試

1. 獲取認(rèn)證令牌

curl -X POST -d "username=your_username&password=your_password" http://localhost:8000/api/token/

2. 獲取 RocketMQ 實(shí)例列表

curl -H "Authorization: Token your_token_here" http://localhost:8000/api/instances/

3. 獲取同步日志

curl -H "Authorization: Token your_token_here" http://localhost:8000/api/sync-logs/

4. 手動觸發(fā)同步

curl -X POST -H "Authorization: Token your_token_here" http://localhost:8000/api/instances/sync_now/

項(xiàng)目結(jié)構(gòu)

rocketmq_manager/
├── rocketmq_manager/
│   ├── __init__.py
│   ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── rocketmq/
│   ├── migrations/
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── aliyun_client.py
│   ├── models.py
│   ├── serializers.py
│   ├── tasks.py
│   ├── urls.py
│   └── views.py
├── manage.py
└── db.sqlite3

關(guān)鍵特性說明

  1. MySQL 存儲:使用 MySQL 數(shù)據(jù)庫存儲 RocketMQ 實(shí)例信息和同步日志
  2. 定時(shí)同步:每小時(shí)自動拉取阿里云 RocketMQ 實(shí)例信息
  3. 數(shù)據(jù)持久化:將實(shí)例信息存儲到數(shù)據(jù)庫,支持索引加速查詢
  4. 手動觸發(fā):提供 API 接口支持手動觸發(fā)同步
  5. 錯誤處理:任務(wù)失敗自動重試,記錄詳細(xì)的同步日志和執(zhí)行時(shí)間
  6. 權(quán)限控制:使用 Token 認(rèn)證保護(hù) API 接口

擴(kuò)展建議

  1. 添加更多阿里云 API 調(diào)用,獲取更詳細(xì)的實(shí)例指標(biāo)(如 TPS、消息堆積量等)
  2. 實(shí)現(xiàn)多區(qū)域支持,同時(shí)監(jiān)控多個(gè)地域的 RocketMQ 實(shí)例
  3. 添加告警機(jī)制,當(dāng)實(shí)例狀態(tài)異?;蛲绞r(shí)發(fā)送通知
  4. 集成緩存系統(tǒng)(如 Redis),提高接口響應(yīng)速度
  5. 添加 API 限流功能,防止惡意請求
  6. 實(shí)現(xiàn)實(shí)例信息的導(dǎo)出功能,支持?jǐn)?shù)據(jù)報(bào)表生成

這個(gè)實(shí)現(xiàn)提供了一個(gè)完整的 Django+Celery 定時(shí)拉取阿里云 RocketMQ 實(shí)例信息的解決方案,使用 MySQL 存儲數(shù)據(jù),支持權(quán)限控制和手動觸發(fā)同步,可直接用于生產(chǎn)環(huán)境。

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Python深度學(xué)習(xí)pytorch神經(jīng)網(wǎng)絡(luò)多層感知機(jī)簡潔實(shí)現(xiàn)

    Python深度學(xué)習(xí)pytorch神經(jīng)網(wǎng)絡(luò)多層感知機(jī)簡潔實(shí)現(xiàn)

    這篇文章主要為大家講解了Python深層學(xué)習(xí)中pytorch神經(jīng)網(wǎng)絡(luò)多層感知機(jī)的簡潔實(shí)現(xiàn)方式,有需要的朋友可以借鑒參考下,希望能夠有所幫助
    2021-10-10
  • python Django里CSRF 對應(yīng)策略詳解

    python Django里CSRF 對應(yīng)策略詳解

    這篇文章主要介紹了python Django里CSRF 對應(yīng)策略詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-08-08
  • mac下給python3安裝requests庫和scrapy庫的實(shí)例

    mac下給python3安裝requests庫和scrapy庫的實(shí)例

    今天小編就為大家分享一篇mac下給python3安裝requests庫和scrapy庫的實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-06-06
  • Python利用PyPDF2庫處理PDF文件的基本操作

    Python利用PyPDF2庫處理PDF文件的基本操作

    PyPDF2是一個(gè)Python庫,用于處理PDF文件,包括合并、分割、旋轉(zhuǎn)和提取文本等操作,它是一個(gè)功能強(qiáng)大且靈活的工具,可用于自動化處理PDF文件,適用于各種應(yīng)用,從文檔管理到數(shù)據(jù)分析,本文將深入介紹PyPDF2庫,掌握如何利用它來處理PDF文件,需要的朋友可以參考下
    2023-11-11
  • Python辦公自動化之定時(shí)郵件提醒和音視頻文件處理

    Python辦公自動化之定時(shí)郵件提醒和音視頻文件處理

    這篇文章主要為大家詳細(xì)介紹了Python辦公自動化中定時(shí)郵件提醒和音視頻文件處理的相關(guān)知識,文中的示例代碼講解詳細(xì),需要的小伙伴可以了解下
    2023-12-12
  • 8個(gè)python新手入門項(xiàng)目

    8個(gè)python新手入門項(xiàng)目

    文將介紹8個(gè)帶有代碼的Python項(xiàng)目,這些項(xiàng)目將幫助大家增強(qiáng)編程能力,這些項(xiàng)目涵蓋了各種主題和難度級別,助力大家成長為一個(gè)Python開發(fā)者
    2024-01-01
  • 一文詳解如何實(shí)現(xiàn)PyTorch模型編譯

    一文詳解如何實(shí)現(xiàn)PyTorch模型編譯

    這篇文章主要為大家介紹了如何實(shí)現(xiàn)PyTorch?模型編譯詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-04-04
  • Python函數(shù)返回多個(gè)值的多種方法小結(jié)

    Python函數(shù)返回多個(gè)值的多種方法小結(jié)

    在Python中,函數(shù)通常用于封裝一段代碼,使其可以重復(fù)調(diào)用,有時(shí),我們希望一個(gè)函數(shù)能夠返回多個(gè)值,Python提供了幾種不同的方法來實(shí)現(xiàn)這一點(diǎn),需要的朋友可以參考下
    2025-05-05
  • 總結(jié)Python常用的魔法方法

    總結(jié)Python常用的魔法方法

    今天帶大家學(xué)習(xí)Python的相關(guān)知識,文中對Python常用的魔法方法作了非常詳細(xì)的總結(jié),對正在學(xué)習(xí)python的小伙伴們有很好地幫助,需要的朋友可以參考下
    2021-05-05
  • 使用Python操作excel文件的實(shí)例代碼

    使用Python操作excel文件的實(shí)例代碼

    這篇文章主要介紹了使用Python操作excel文件的實(shí)例代碼,需要的朋友可以參考下
    2017-10-10

最新評論