Spring?Cloud實(shí)現(xiàn)灰度發(fā)布的示例代碼
一、背景
通過對請求標(biāo)記分組,實(shí)現(xiàn)請求在灰度服務(wù)的分組中流轉(zhuǎn),當(dāng)微服務(wù)鏈路內(nèi)無灰度分組對應(yīng)的下游服務(wù)時,用主線分組中對應(yīng)的微服務(wù)提供服務(wù)。
1、應(yīng)用場景
(1)A/B Testing
線上環(huán)境實(shí)現(xiàn)A/B Testing,期望在生產(chǎn)環(huán)境通過內(nèi)測用戶驗(yàn)證無誤后再全量發(fā)布給所有用戶使用。
(2)多版本開發(fā)測試調(diào)試
多個版本并行開發(fā)時,需要為每個版本準(zhǔn)備一整套開發(fā)環(huán)境。如果版本較多,開發(fā)環(huán)境成本會非常大。分組隔離可以在多版本開發(fā)測試時大幅度降低資源成本,并實(shí)現(xiàn)開發(fā)機(jī)加入測試環(huán)境完成本地代碼調(diào)試。
2、需要解決的問題
現(xiàn)有的灰度發(fā)布工具可以實(shí)現(xiàn)同步調(diào)用鏈路的流量按請求標(biāo)識在響應(yīng)的服務(wù)分組內(nèi)流轉(zhuǎn),但是存在兩個異步調(diào)用鏈路問題導(dǎo)致灰度請求無法在灰度環(huán)境中流轉(zhuǎn)完畢:
(1)異步線程
鏈路中存在異步線程調(diào)用下游服務(wù)時,請求中灰度分組標(biāo)識會丟失,導(dǎo)致灰度請求被流轉(zhuǎn)到主線分組中處理,灰度分組無法正常接收異步線程調(diào)用的請求;
(2)異步消息
當(dāng)鏈路中請求產(chǎn)生mq消息后,因灰度分組和主線分組內(nèi)消息消費(fèi)方監(jiān)聽同一隊(duì)列導(dǎo)致消息流轉(zhuǎn)混亂,易出現(xiàn)問題:消息處理邏輯不能兼容、消息丟失(因同一隊(duì)列在同一訂閱組內(nèi)訂閱規(guī)則可能不一致)等;
二、方案實(shí)現(xiàn)
方案實(shí)現(xiàn)前提:在項(xiàng)目中使用Nacos,Spring Cloud OpenFeign、Spring Cloud Gateway,RoketMq
1、自定義SpringMVC攔截器
將http請求中的灰度分組標(biāo)識寫入當(dāng)前本地線程ThreadLocal中,ThreadLocal采用Alibaba開源的TransmittableThreadLocal增強(qiáng),解決當(dāng)前請求中存在異步線程調(diào)用下游服務(wù)時,請求中灰度分組標(biāo)識會丟失,導(dǎo)致灰度請求被流轉(zhuǎn)到主線分組中處理的問題。
(1)攔截器實(shí)現(xiàn)
package com.easyhome.common.feign; import com.easyhome.common.utils.GrayscaleConstant; import lombok.extern.slf4j.Slf4j; import org.springframework.lang.Nullable; import org.springframework.util.StringUtils; import org.springframework.web.servlet.HandlerInterceptor; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; /** * 請求分組參數(shù)攔截器 * @author wangshufeng */ @Slf4j public class TransmitHeaderPrintLogHanlerInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { Map<String,String> param=new HashMap<>(8); //獲取所有灰度參數(shù)值設(shè)置到ThreadLocal,以便傳值 for (GrayHeaderParam item:GrayHeaderParam.values()) { String hParam = request.getHeader(item.getValue()); if(!StringUtils.isEmpty(hParam)){ param.put(item.getValue(), hParam); } } GrayParamHolder.putValues(param); return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable Exception ex) throws Exception { //清除灰度ThreadLocal GrayParamHolder.clearValue(); } }
(2)ThreadLocal增強(qiáng)工具類
package com.easyhome.common.feign; import com.alibaba.ttl.TransmittableThreadLocal; import com.easyhome.common.utils.GrayUtil; import com.easyhome.common.utils.GrayscaleConstant; import java.util.HashMap; import java.util.Map; import java.util.Objects; /** * 異步線程間參數(shù)傳遞 * * @author wangshufeng */ public class GrayParamHolder { /** * 在Java的啟動參數(shù)加上:-javaagent:path/to/transmittable-thread-local-2.x.y.jar。 * <p> * 注意: * <p> * 如果修改了下載的TTL的Jar的文件名(transmittable-thread-local-2.x.y.jar),則需要自己手動通過-Xbootclasspath JVM參數(shù)來顯式配置。 * 比如修改文件名成ttl-foo-name-changed.jar,則還需要加上Java的啟動參數(shù):-Xbootclasspath/a:path/to/ttl-foo-name-changed.jar。 * 或使用v2.6.0之前的版本(如v2.5.1),則也需要自己手動通過-Xbootclasspath JVM參數(shù)來顯式配置(就像TTL之前的版本的做法一樣)。 * 加上Java的啟動參數(shù):-Xbootclasspath/a:path/to/transmittable-thread-local-2.5.1.jar。 */ private static ThreadLocal<Map<String, String>> paramLocal = new TransmittableThreadLocal(); /** * 獲取單個參數(shù)值 * * @param key * @return */ public static String getValue(String key) { Map<String, String> paramMap = GrayParamHolder.paramLocal.get(); if (Objects.nonNull(paramMap) && !paramMap.isEmpty()) { return paramMap.get(key); } return null; } /** * 獲取所有參數(shù) * * @return */ public static Map<String, String> getGrayMap() { Map<String, String> paramMap = GrayParamHolder.paramLocal.get(); if(paramMap==null){ paramMap=new HashMap<>(8); if(GrayUtil.isGrayPod()){ paramMap.put(GrayscaleConstant.HEADER_KEY, GrayscaleConstant.HEADER_VALUE); paramMap.put(GrayscaleConstant.PRINT_HEADER_LOG_KEY, GrayscaleConstant.STR_BOOLEAN_TRUE); GrayParamHolder.paramLocal.set(paramMap); } } return paramMap; } /** * 設(shè)置單個參數(shù) * * @param key * @param value */ public static void putValue(String key, String value) { Map<String, String> paramMap = GrayParamHolder.paramLocal.get(); if (Objects.isNull(paramMap) || paramMap.isEmpty()) { paramMap = new HashMap<>(6); GrayParamHolder.paramLocal.set(paramMap); } paramMap.put(key, value); } /** * 設(shè)置單多個參數(shù) * * @param map */ public static void putValues(Map<String,String> map) { Map<String, String> paramMap = GrayParamHolder.paramLocal.get(); if (Objects.isNull(paramMap) || paramMap.isEmpty()) { paramMap = new HashMap<>(6); GrayParamHolder.paramLocal.set(paramMap); } if(Objects.nonNull(map)&&!map.isEmpty()){ for (Map.Entry<String,String> item:map.entrySet()){ paramMap.put(item.getKey(),item.getValue()); } } } /** * 清空線程參數(shù) */ public static void clearValue() { GrayParamHolder.paramLocal.remove(); } }
(3)啟動加載攔截器
package com.easyhome.common.feign; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; /** * 請求分組參數(shù)攔截器加載配置 * @author wangshufeng */ @Configuration public class TransmitHeaderPrintLogConfig implements WebMvcConfigurer { /** * 配置攔截規(guī)則與注入攔截器 * @param registry */ @Override public void addInterceptors(InterceptorRegistry registry) { // addPathPattern 添加攔截規(guī)則 /** 攔截所有包括靜態(tài)資源 // excludePathPattern 排除攔截規(guī)則 所以我們需要放開靜態(tài)資源的攔截 registry.addInterceptor(new TransmitHeaderPrintLogHanlerInterceptor()) .addPathPatterns("/**"); } }
2、自定義Feign攔截器
將自定義SpringMVC攔截器中放入ThreadLocal的灰度分組標(biāo)識傳遞給下游服務(wù)。
package com.easyhome.common.feign; import com.easyhome.common.utils.GrayscaleConstant; import feign.RequestInterceptor; import feign.RequestTemplate; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import java.util.Map; import java.util.Objects; /** * feign傳遞請求頭信息攔截器 * * @author wangshufeng */ @Slf4j @Configuration public class FeignTransmitHeadersRequestInterceptor implements RequestInterceptor { @Override public void apply(RequestTemplate requestTemplate) { Map<String,String> attributes=GrayParamHolder.getGrayMap(); if (Objects.nonNull(attributes)) { //灰度標(biāo)識傳遞 String version = attributes.get(GrayscaleConstant.HEADER_KEY); if(!StringUtils.isEmpty(version)){ requestTemplate.header(GrayscaleConstant.HEADER_KEY, version); } //自定義一些在鏈路中需要一直攜帶的通用參數(shù) //userId傳遞 String userId = attributes.get(GrayscaleConstant.USER_ID); if(!StringUtils.isEmpty(userId)){ requestTemplate.header(GrayscaleConstant.USER_ID, userId); } String dwLang = attributes.get(GrayscaleConstant.DW_LANG); if(!StringUtils.isEmpty(dwLang)){ requestTemplate.header(GrayscaleConstant.DW_LANG, dwLang); } String deviceOs = attributes.get(GrayscaleConstant.DEVICE_OS); if(!StringUtils.isEmpty(deviceOs)){ requestTemplate.header(GrayscaleConstant.DEVICE_OS, deviceOs); } } } }
3、自定義負(fù)載策略
(1)負(fù)載策略實(shí)現(xiàn)
通過請求中的分組標(biāo)識選擇對應(yīng)分組的服務(wù)列表,實(shí)現(xiàn)請求在灰度服務(wù)的分組中流轉(zhuǎn),當(dāng)微服務(wù)鏈路內(nèi)無對應(yīng)分組的下游服務(wù)存活時,用主線分組中對應(yīng)的微服務(wù)提供服務(wù)。
基于com.alibaba.cloud.nacos.ribbon.NacosRule重寫
package com.easyhome.common.nacos.ribbon; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.ribbon.ExtendBalancer; import com.alibaba.cloud.nacos.ribbon.NacosServer; import com.alibaba.nacos.api.naming.NamingService; import com.alibaba.nacos.api.naming.pojo.Instance; import com.easyhome.common.utils.GrayUtil; import com.easyhome.common.utils.GrayscaleConstant; import com.netflix.client.config.IClientConfig; import com.netflix.loadbalancer.AbstractLoadBalancerRule; import com.netflix.loadbalancer.DynamicServerListLoadBalancer; import com.netflix.loadbalancer.Server; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; /** * nacos自定義負(fù)載策略 * * @author wangshufeng */ @Slf4j public class NacosRule extends AbstractLoadBalancerRule { @Autowired private NacosDiscoveryProperties nacosDiscoveryProperties; @Override public Server choose(Object key) { try { String clusterName = this.nacosDiscoveryProperties.getClusterName(); DynamicServerListLoadBalancer loadBalancer = (DynamicServerListLoadBalancer) getLoadBalancer(); String name = loadBalancer.getName(); NamingService namingService = nacosDiscoveryProperties.namingServiceInstance(); List<Instance> instances = namingService.selectInstances(name, true); instances = this.getGrayFilterInstances(instances, key); if (CollectionUtils.isEmpty(instances)) { log.warn("no instance in service {}", name); return null; } List<Instance> instancesToChoose = instances; if (StringUtils.isNotBlank(clusterName)) { List<Instance> sameClusterInstances = instances.stream() .filter(instance -> Objects.equals(clusterName, instance.getClusterName())) .collect(Collectors.toList()); if (!CollectionUtils.isEmpty(sameClusterInstances)) { instancesToChoose = sameClusterInstances; } else { log.warn( "A cross-cluster call occurs,name = {}, clusterName = {}, instance = {}", name, clusterName, instances); } } Instance instance = ExtendBalancer.getHostByRandomWeight2(instancesToChoose); return new NacosServer(instance); } catch (Exception e) { log.warn("NacosRule error", e); return null; } } /** * 根據(jù)當(dāng)前請求是否為灰度過濾服務(wù)實(shí)例列表 * * @param instances * @return List<Instance> */ private List<Instance> getGrayFilterInstances(List<Instance> instances, Object key) { if (CollectionUtils.isEmpty(instances)) { return instances; } else { //是否灰度請求 Boolean isGrayRequest; String grayGroup=GrayscaleConstant.HEADER_VALUE; //兼容gateway傳值方式,gateway是nio是通過key來做負(fù)載實(shí)例識別的 if (Objects.nonNull(key) && !GrayscaleConstant.DEFAULT.equals(key)) { isGrayRequest = true; if(isGrayRequest){ grayGroup=(String)key; } } else { isGrayRequest = GrayUtil.isGrayRequest(); if(isGrayRequest){ grayGroup=GrayUtil.requestGroup(); } } List<Instance> prodInstance=new ArrayList<>(); List<Instance> grayInstance=new ArrayList<>(); for(Instance item:instances){ Map<String, String> metadata = item.getMetadata(); if (metadata.isEmpty() || !GrayscaleConstant.STR_BOOLEAN_TRUE.equals(metadata.get(GrayscaleConstant.POD_GRAY))) { prodInstance.add(item); } if (isGrayRequest) { if (!metadata.isEmpty() && GrayscaleConstant.STR_BOOLEAN_TRUE.equals(metadata.get(GrayscaleConstant.POD_GRAY))) { if(Objects.equals(grayGroup,metadata.get(GrayscaleConstant.GRAY_GROUP))){ grayInstance.add(item); } } } } if(!isGrayRequest||CollectionUtils.isEmpty(grayInstance)){ return prodInstance; } return grayInstance; } } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { } }
(2)啟動加載負(fù)載策略
package com.easyhome.common.nacos; import com.easyhome.common.nacos.ribbon.NacosRule; import com.netflix.loadbalancer.IRule; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * 灰度負(fù)載策略配置 * @author wangshufeng */ @Configuration public class BalancerRuleConfig { @Bean @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public IRule getRule(){ return new NacosRule(); } }
4、注冊服務(wù)添加元數(shù)據(jù)信息
在服務(wù)啟動時向注冊中心注冊當(dāng)前服務(wù)所在服務(wù)分組信息,在自定義負(fù)載策略中通過識別服務(wù)元數(shù)據(jù)中服務(wù)分組信息進(jìn)行服務(wù)選擇。
package com.easyhome.common.nacos; import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.discovery.NacosWatch; import com.easyhome.common.utils.GrayUtil; import com.easyhome.common.utils.GrayscaleConstant; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cloud.client.CommonsClientAutoConfiguration; import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Objects; /** * 注冊服務(wù)添加元數(shù)據(jù)信息 * * @author wangshufeng */ @Slf4j @Configuration @ConditionalOnNacosDiscoveryEnabled @AutoConfigureBefore({SimpleDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class}) public class NacosMetadataConfig { @Bean @ConditionalOnMissingBean @ConditionalOnProperty(value = {"spring.cloud.nacos.discovery.watch.enabled"}, matchIfMissing = true) public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties) { String grayFlg = GrayUtil.isGrayPod().toString(); log.info("注冊服務(wù)添加元數(shù)據(jù):當(dāng)前實(shí)例是否為灰度環(huán)境-{}", grayFlg); nacosDiscoveryProperties.getMetadata().put(GrayscaleConstant.POD_GRAY, grayFlg); if(Objects.equals(grayFlg,GrayscaleConstant.STR_BOOLEAN_TRUE)){ String groupFlg = GrayUtil.podGroup(); nacosDiscoveryProperties.getMetadata().put(GrayscaleConstant.GRAY_GROUP, groupFlg); } return new NacosWatch(nacosDiscoveryProperties); } }
5、異步消息處理
采用消息雙隊(duì)列隔離消息的流轉(zhuǎn),消費(fèi)方通過識別消息來源隊(duì)列在調(diào)用下游服務(wù)時放入服務(wù)分組信息,達(dá)到鏈路的正確流轉(zhuǎn)。 消息消費(fèi)方灰度分組有實(shí)例運(yùn)行情況:
消息消費(fèi)方灰度分組實(shí)例下線情況:
(1)自定義灰度mq消息監(jiān)聽器
接收灰度隊(duì)列消息后在當(dāng)前線程中添加灰度流量分組標(biāo)識,保證在消息處理邏輯中調(diào)用下游服務(wù)時請求在對應(yīng)分組內(nèi)流轉(zhuǎn)。
package com.easyhome.common.rocketmq; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.easyhome.common.feign.GrayParamHolder; import com.easyhome.common.utils.GrayscaleConstant; import lombok.extern.slf4j.Slf4j; /** * 灰度mq消息監(jiān)聽器 * 通過topic后綴判斷是否為灰度流量 * @author wangshufeng */ @Slf4j public final class GrayMessageListener implements MessageListener { private MessageListener messageListener; public GrayMessageListener(MessageListener messageListener) { this.messageListener = messageListener; } @Override public Action consume(Message message, ConsumeContext context) { if(message.getTopic().endsWith(GrayscaleConstant.GRAY_TOPIC_SUFFIX)){ GrayParamHolder.putValue(GrayscaleConstant.HEADER_KEY, GrayscaleConstant.HEADER_VALUE); GrayParamHolder.putValue(GrayscaleConstant.PRINT_HEADER_LOG_KEY, GrayscaleConstant.STR_BOOLEAN_TRUE); log.info("為當(dāng)前mq設(shè)置傳遞灰度標(biāo)識。"); } Action result= messageListener.consume(message,context); GrayParamHolder.clearValue(); return result; } }
(2)自定義spring灰度環(huán)境變更事件
package com.easyhome.common.event; import com.easyhome.common.rocketmq.ListenerStateEnum; import org.springframework.context.ApplicationEvent; /** * 灰度環(huán)境變更事件 * @author wangshufeng */ public class GrayEventChangeEvent extends ApplicationEvent { /** * Create a new {@code ApplicationEvent}. * * @param source the object on which the event initially occurred or with * which the event is associated (never {@code null}) */ public GrayEventChangeEvent(ListenerStateEnum source) { super(source); } }
(3)灰度實(shí)例上下線事件處理基礎(chǔ)類
定義spring灰度環(huán)境變更事件統(tǒng)一處理抽象類,RocketMq消費(fèi)者繼承此抽象類,實(shí)現(xiàn)當(dāng)前服務(wù)實(shí)例監(jiān)聽spring事件完成正式隊(duì)列和灰度隊(duì)列的監(jiān)聽自動切換。
package com.easyhome.common.rocketmq; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.easyhome.common.event.GrayEventChangeEvent; import com.easyhome.common.utils.GrayUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationListener; import org.springframework.util.StringUtils; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Properties; /** * 灰度實(shí)例上下線事件處理基礎(chǔ)類 * * @author wangshufeng */ @Slf4j public abstract class AbstractGrayEventListener implements ApplicationListener<GrayEventChangeEvent> { private Consumer consumer; private Consumer consumerGray; /** * 默認(rèn)訂閱tag規(guī)則 */ private static final String DEFAULT_SUB_EXPRESSION = "*"; private List<SubscriptionData> subscribes = new ArrayList<>(); private ListenerStateEnum currentState; private Properties mqProperties; @Resource private ApplicationContext applicationContext; /** * 初始化消費(fèi)者實(shí)例 */ public void initConsumer() { if (GrayUtil.isGrayPod()) { initConsumerGray(); } else { initConsumerProduction(); } } /** * 初始化生產(chǎn)消費(fèi)者實(shí)例 */ private void initConsumerProduction() { if (consumer == null) { synchronized (this) { if (consumer == null) { if (Objects.isNull(mqProperties)) { throw new NullPointerException("rocketMq配置信息未設(shè)置"); } else { consumer = ONSFactory.createConsumer(mqProperties); consumer.start(); } } } } } /** * 初始化灰度消費(fèi)者實(shí)例 */ private void initConsumerGray() { if (consumerGray == null) { synchronized (this) { if (consumerGray == null) { if (Objects.isNull(mqProperties)) { throw new NullPointerException("rocketMq配置信息未設(shè)置"); } else { Properties grayProperties = new Properties(); grayProperties.putAll(mqProperties); grayProperties.setProperty(PropertyKeyConst.GROUP_ID, GrayUtil.topicGrayName(grayProperties.getProperty(PropertyKeyConst.GROUP_ID))); consumerGray = ONSFactory.createConsumer(grayProperties); consumerGray.start(); } } } } } @Override public void onApplicationEvent(GrayEventChangeEvent event) { ListenerStateEnum listenerStateEnum = (ListenerStateEnum) event.getSource(); log.info(this.getClass().getName() + "灰度環(huán)境變更:" + listenerStateEnum.getValue()); currentState = listenerStateEnum; if (ListenerStateEnum.PRODUCTION.equals(listenerStateEnum)) { initConsumerProduction(); for (SubscriptionData item : subscribes) { if (Objects.nonNull(consumer)) { consumer.subscribe(item.getTopic(), item.getSubExpression(), item.getListener()); } } shutdownConsumerGray(); } if (ListenerStateEnum.TOGETHER.equals(listenerStateEnum)) { initConsumerProduction(); initConsumerGray(); for (SubscriptionData item : subscribes) { if (Objects.nonNull(consumer)) { consumer.subscribe(item.getTopic(), item.getSubExpression(), item.getListener()); } if (Objects.nonNull(consumerGray)) { consumerGray.subscribe(GrayUtil.topicGrayName(item.getTopic()), item.getSubExpression(), item.getListener()); } } } if (ListenerStateEnum.GRAYSCALE.equals(listenerStateEnum)) { initConsumerGray(); for (SubscriptionData item : subscribes) { if (Objects.nonNull(consumerGray)) { consumerGray.subscribe(GrayUtil.topicGrayName(item.getTopic()), item.getSubExpression(), item.getListener()); } } shutdownConsumerProduction(); } } /** * 添加訂閱規(guī)則 * * @param topic 主題 * @param listenerClass 處理消息監(jiān)聽器類名稱 * @return AbstractGrayEventListener */ public AbstractGrayEventListener subscribe(String topic, Class<? extends MessageListener> listenerClass) { return this.subscribe(topic, DEFAULT_SUB_EXPRESSION, listenerClass); } /** * 添加訂閱規(guī)則 * * @param topic 主題 * @param subExpression 訂閱tag規(guī)則 * @param listenerClass 處理消息監(jiān)聽器類名稱 * @return AbstractGrayEventListener */ public AbstractGrayEventListener subscribe(String topic, String subExpression, Class<? extends MessageListener> listenerClass) { if (Objects.isNull(listenerClass)) { throw new NullPointerException("listenerClass信息未設(shè)置"); } MessageListener listener = applicationContext.getBean(listenerClass); if (Objects.isNull(listener)) { throw new NullPointerException(listenerClass.getName().concat("未找到實(shí)例對象")); } return this.subscribe(topic, subExpression, listener); } /** * 添加訂閱規(guī)則 * * @param topic 主題 * @param listener 處理消息監(jiān)聽器 * @return AbstractGrayEventListener */ public AbstractGrayEventListener subscribe(String topic, MessageListener listener) { return this.subscribe(topic, DEFAULT_SUB_EXPRESSION, listener); } /** * 添加訂閱規(guī)則 * * @param topic 主題 * @param subExpression 訂閱tag規(guī)則 * @param listener 處理消息監(jiān)聽器 * @return AbstractGrayEventListener */ public AbstractGrayEventListener subscribe(String topic, String subExpression, MessageListener listener) { if (StringUtils.isEmpty(topic)) { throw new NullPointerException("topic信息未設(shè)置"); } if (StringUtils.isEmpty(subExpression)) { throw new NullPointerException("subExpression信息未設(shè)置"); } if (Objects.isNull(listener)) { throw new NullPointerException("listener信息未設(shè)置"); } if (listener instanceof GrayMessageListener) { subscribes.add(new SubscriptionData(topic, subExpression, listener)); } else { subscribes.add(new SubscriptionData(topic, subExpression, new GrayMessageListener(listener))); } return this; } /** * 設(shè)置RoketMq配置屬性 * * @param mqProperties 配置屬性 * @return AbstractGrayEventListener */ public AbstractGrayEventListener setMqProperties(Properties mqProperties) { this.mqProperties = mqProperties; return this; } /** * 銷毀方法 */ @PreDestroy public void shutdown() { shutdownConsumerProduction(); shutdownConsumerGray(); } /** * 銷毀生產(chǎn)消費(fèi)實(shí)例 */ private void shutdownConsumerProduction() { if (Objects.nonNull(consumer)) { consumer.shutdown(); consumer = null; } } /** * 銷毀灰度消費(fèi)者實(shí)例 */ private void shutdownConsumerGray() { if (Objects.nonNull(consumerGray)) { consumerGray.shutdown(); consumerGray = null; } } }
(4)nacos注冊中心服務(wù)列表變更事件監(jiān)聽器實(shí)現(xiàn)
監(jiān)聽nacos注冊中心服務(wù)列表發(fā)生變化的事件,識別當(dāng)前實(shí)例需要監(jiān)聽的消息隊(duì)列的類型,發(fā)出spring灰度環(huán)境變更事件通知所有mq消費(fèi)者完成監(jiān)聽隊(duì)列切換。
package com.easyhome.common.nacos; import com.alibaba.nacos.api.naming.listener.Event; import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.alibaba.nacos.api.naming.pojo.Instance; import com.easyhome.common.event.GrayEventChangeEvent; import com.easyhome.common.rocketmq.ListenerStateEnum; import com.easyhome.common.utils.GrayUtil; import com.easyhome.common.utils.GrayscaleConstant; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.List; /** * nacos自定義監(jiān)聽實(shí)現(xiàn) * * @author wangshufeng */ @Slf4j @Component public class NacosEventListener implements EventListener { @Resource private ApplicationEventPublisher publisher; @Override public void onEvent(Event event) { if (event instanceof NamingEvent) { this.mqInit(((NamingEvent) event).getInstances()); } } /** * 當(dāng)前的mq監(jiān)聽狀態(tài) */ private static ListenerStateEnum listenerMqState; public synchronized void mqInit(List<Instance> instances) { ListenerStateEnum newState; //當(dāng)前實(shí)例是灰度實(shí)例 if (GrayUtil.isGrayPod()) { newState = ListenerStateEnum.GRAYSCALE; } else { //判斷當(dāng)前服務(wù)有灰度實(shí)例 if (this.isHaveGray(instances)) { newState = ListenerStateEnum.PRODUCTION; } else { newState = ListenerStateEnum.TOGETHER; } } log.info("當(dāng)前實(shí)例是否為灰度環(huán)境:{}", GrayUtil.isGrayPod()); log.info("當(dāng)前實(shí)例監(jiān)聽mq隊(duì)列的狀態(tài):{}", newState.getValue()); //防止重復(fù)初始化監(jiān)聽mq隊(duì)列信息 if (!newState.equals(listenerMqState)) { listenerMqState = newState; publisher.publishEvent(new GrayEventChangeEvent(listenerMqState)); } } /** * 是否有灰度實(shí)例 * * @return */ private boolean isHaveGray(List<Instance> instances) { if (!CollectionUtils.isEmpty(instances)) { for (Instance instance : instances) { if (GrayscaleConstant.STR_BOOLEAN_TRUE.equals(instance.getMetadata().get(GrayscaleConstant.POD_GRAY))) { return true; } } } return false; } }
(5)加載nacos自定義監(jiān)聽器
package com.easyhome.common.nacos; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingFactory; import com.alibaba.nacos.api.naming.NamingService; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * 配置nacos自定義監(jiān)聽 * @author wangshufeng */ @Configuration @Slf4j public class NacosListenerConfig { @Resource NacosDiscoveryProperties nacosDiscoveryProperties; @Resource NacosEventListener nacosEventListener; @PostConstruct public void subscribe() { try { NamingService namingService = NamingFactory.createNamingService(nacosDiscoveryProperties.getServerAddr()); namingService.subscribe(nacosDiscoveryProperties.getService(),nacosDiscoveryProperties.getGroup(), nacosEventListener); log.info("配置nacos自定義監(jiān)聽完成"); } catch (NacosException e) { log.error("配置nacos自定義監(jiān)聽錯誤", e); } } }
三、使用方法
1、項(xiàng)目中引入easyhome-common-gray.jar
<dependency> <groupId>com.easyhome</groupId> <artifactId>easyhome-common-gray</artifactId> <version>1.0.2-RELEASE</version> </dependency>
2、 SpringBoot啟動類上添加掃描類路徑
@SpringBootApplication(scanBasePackages = {"com.easyhome.*" })
3、 定義RocketMq消費(fèi)者時,繼承AbstractGrayEventListener,示例代碼如下
/** * 商品事件消費(fèi) * @author wangshufeng */ @Component @Slf4j public class GoodsChangeEventConsumer extends AbstractGrayEventListener { @Resource private MqGoodsConfig mqConfig; @Resource private MqMarketingConfig mqMarketingConfig; /** * 消息訂閱 */ @PostConstruct public void consume() { this.subscribe(mqConfig.getGoodsEventTopic(), "*", GoodsChangeMessageListener.class) .subscribe(mqConfig.getShopEventTopic(), "*", ShopChangeMessageListener.class) .subscribe(this.mqMarketingConfig.getChangeTopic(), this.mqMarketingConfig.getChangeTag(), MarketingChangeMessageListener.class) .subscribe(mqConfig.getCategoryEventTopic(),"*", CategoryChangeMessageListener.class) .setMqProperties(mqConfig.getGoodsEventMsgMqProperties()).initConsumer(); } }
4、jvm 啟動參數(shù)添加如下
-Dpod.gray值為false時,啟動服務(wù)實(shí)例為主線分組實(shí)例,-Dgray.group無需設(shè)置;-Dpod.gray值為true時,啟動服務(wù)實(shí)例為灰度分組實(shí)例,-Dgray.group需設(shè)置當(dāng)前服務(wù)實(shí)例所屬分組
-javaagent:/home/easyhome/transmittable-thread-local-2.13.2.jar
-Dpod.gray=true -Dgray.group=自定義分組名稱
四、存在問題
目前消息只支持主線隊(duì)列和灰度隊(duì)列兩種隊(duì)列,多灰度組時灰度消息沒有分組隔離,后續(xù)版本解決。
以上就是Spring Cloud實(shí)現(xiàn)灰度發(fā)布的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Spring Cloud灰度發(fā)布的資料請關(guān)注腳本之家其它相關(guān)文章!
- Spring Cloud 優(yōu)雅下線以及灰度發(fā)布實(shí)現(xiàn)
- SpringCloud實(shí)現(xiàn)灰度發(fā)布的方法步驟
- springcloud+nacos實(shí)現(xiàn)灰度發(fā)布示例詳解
- 關(guān)于SpringCloud灰度發(fā)布的實(shí)現(xiàn)
- SpringCloud灰度發(fā)布的設(shè)計(jì)與實(shí)現(xiàn)詳解
- SpringCloud的全鏈路灰度發(fā)布方案詳解
- SpringCloud實(shí)現(xiàn)全鏈路灰度發(fā)布的示例詳解
- Spring Cloud Gateway實(shí)現(xiàn)灰度發(fā)布方案
相關(guān)文章
Mybatis查詢Sql結(jié)果未映射到對應(yīng)得實(shí)體類上的問題解決
使用mybatis查詢表數(shù)據(jù)得時候,發(fā)現(xiàn)對應(yīng)得實(shí)體類字段好多都是null,本文主要介紹了Mybatis查詢Sql結(jié)果未映射到對應(yīng)得實(shí)體類上的問題解決,具有一定的參考價值,感興趣的可以了解一下2024-02-02MyBatis XML去除多余AND|OR前綴或逗號等后綴的操作
這篇文章主要介紹了MyBatis XML去除多余AND|OR前綴或逗號等后綴的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02SpringMVC用XML方式實(shí)現(xiàn)AOP的方法示例
這篇文章主要介紹了SpringMVC用XML方式實(shí)現(xiàn)AOP的方法示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04Java數(shù)據(jù)結(jié)構(gòu)之樹和二叉樹的相關(guān)資料
這篇文章主要介紹了Java?數(shù)據(jù)結(jié)構(gòu)之樹和二叉樹相關(guān)資料,文中通過示例代碼和一些相關(guān)題目來做介紹,非常詳細(xì)。對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下!2023-01-01Java mysql詳細(xì)講解雙數(shù)據(jù)源配置使用
在開發(fā)過程中我們常常會用到兩個數(shù)據(jù)庫,一個數(shù)據(jù)用來實(shí)現(xiàn)一些常規(guī)的增刪改查,另外一個數(shù)據(jù)庫用來實(shí)時存數(shù)據(jù)。進(jìn)行數(shù)據(jù)的統(tǒng)計(jì)分析??梢宰x寫分離??梢愿玫膬?yōu)化和提高效率;或者兩個數(shù)據(jù)存在業(yè)務(wù)分離的時候也需要多個數(shù)據(jù)源來實(shí)現(xiàn)2022-06-06