Spring Cloud Consul實現(xiàn)選舉機制的代碼工程
1.什么是Spring Cloud Consul?
Spring Cloud Consul 是 Spring Cloud 提供的對 HashiCorp Consul 的支持。它是一種基于服務網(wǎng)格的工具,用于實現(xiàn)服務注冊、發(fā)現(xiàn)、配置管理和健康檢查。 主要功能包括:
- 服務注冊與發(fā)現(xiàn):通過 Consul 的服務注冊功能,Spring Cloud Consul 可以實現(xiàn)微服務的動態(tài)注冊和發(fā)現(xiàn),簡化服務間通信。
- 分布式配置管理:通過 Consul 的 Key/Value 存儲機制,提供對分布式配置的管理。
- 健康檢查:支持服務實例的健康檢查,確保只有健康的實例可供其他服務調用。
- 選舉與分布式鎖:通過 Consul 的會話機制,支持分布式鎖和領導選舉。
Spring Cloud Consul 的選舉機制
Spring Cloud Consul 的選舉機制基于 Consul 會話(Session) 和 鍵值存儲(Key/Value Store) 實現(xiàn)分布式領導選舉。
工作原理:
- 會話創(chuàng)建:
- 服務實例向 Consul 創(chuàng)建一個會話(Session),這是一個臨時的、與實例綁定的對象。
- 會話帶有 TTL(生存時間),需要定期續(xù)約,保持活躍狀態(tài)。
- 獲取鎖(Lock):
- 通過將一個 Key 的值設置為當前會話 ID,服務嘗試獲取該 Key 的鎖。
- Consul 使用 CAS(Compare and Swap)操作來確保只有一個服務實例可以成功獲取鎖。
- 鎖定成功:
- 成功獲取鎖的服務實例被視為領導者(Leader)。
- 其他實例會定期嘗試獲取鎖,但只能等待當前鎖被釋放或超時。
- 鎖釋放或失效:
- 如果領導實例未能及時續(xù)約會話(例如宕機或網(wǎng)絡中斷),Consul 會釋放與該會話相關聯(lián)的鎖,其他實例可以競爭成為新的領導者。
2.環(huán)境搭建
run Consul Agent
docker run -d --name=dev-consul -p 8500:8500 consul
web ui
http://localhost:8500
3.代碼工程
實驗目標
- 使用 Consul 提供的會話機制和鍵值存儲來實現(xiàn) 分布式領導選舉。
- 通過
@InboundChannelAdapter
和@ServiceActivator
實現(xiàn)周期性檢查領導身份并執(zhí)行領導任務。
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springcloud-demo</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>LeaderElection</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- Spring Cloud Starter Consul Discovery --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-consul-discovery</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> </dependency> </dependencies> </project>
LeaderElectionConfig.java
package com.et; import jakarta.annotation.PreDestroy; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.core.MessageSource; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.web.client.RestTemplate; @Configuration public class LeaderElectionConfig { private static final String LEADER_KEY = "service/leader"; private static final String CONSUL_URL = "http://localhost:8500"; private String sessionId; @Bean @InboundChannelAdapter(value = "leaderChannel", poller = @Poller(fixedDelay = "5000")) public MessageSource<String> leaderMessageSource() { return () -> { // Implement logic to check if this instance is the leader boolean isLeader = checkLeadership(); return MessageBuilder.withPayload(isLeader ? "I am the leader" : "I am not the leader").build(); }; } @Bean @ServiceActivator(inputChannel = "leaderChannel") public MessageHandler leaderMessageHandler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println(message.getPayload()); // Implement logic to perform leader-specific tasks } }; } private final RestTemplate restTemplate = new RestTemplate(); public LeaderElectionConfig() { this.sessionId = createSession(); } private String createSession() { String url = CONSUL_URL + "/v1/session/create"; HttpHeaders headers = new HttpHeaders(); HttpEntity<String> entity = new HttpEntity<>("{\"Name\": \"leader-election-session\"}", headers); //ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class); // PUT ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.PUT, entity, String.class); // Extract session ID from response return response.getBody().split("\"")[3]; // This is a simple way to extract the session ID } public boolean checkLeadership() { String url = CONSUL_URL + "/v1/kv/" + LEADER_KEY + "?acquire=" + sessionId; HttpHeaders headers = new HttpHeaders(); HttpEntity<String> entity = new HttpEntity<>(headers); ResponseEntity<Boolean> response = restTemplate.exchange(url, HttpMethod.PUT, entity, Boolean.class); return Boolean.TRUE.equals(response.getBody()); } public void releaseLeadership() { String url = CONSUL_URL + "/v1/kv/" + LEADER_KEY + "?release=" + sessionId; HttpHeaders headers = new HttpHeaders(); HttpEntity<String> entity = new HttpEntity<>(headers); ResponseEntity<Boolean> response = restTemplate.exchange(url, HttpMethod.PUT, entity, Boolean.class); if (Boolean.TRUE.equals(response.getBody())) { System.out.println("Released leadership successfully"); } else { System.out.println("Failed to release leadership"); } } @PreDestroy public void onExit() { releaseLeadership(); } }
代碼解釋
- 初始化:
- 啟動時通過
createSession()
向 Consul 注冊會話。
- 啟動時通過
- 周期性任務:
- 每 5 秒通過
checkLeadership()
檢查領導身份。 - 如果是領導者,執(zhí)行特定任務(如打印日志、執(zhí)行業(yè)務邏輯)。
- 每 5 秒通過
- 釋放資源:
- 應用關閉時,通過
releaseLeadership()
釋放鎖。
- 應用關閉時,通過
LeaderElectionApplication.java
package com.et; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.integration.config.EnableIntegration; @SpringBootApplication @EnableDiscoveryClient @EnableIntegration public class LeaderElectionApplication { public static void main(String[] args) { SpringApplication.run(LeaderElectionApplication.class, args); } }
配置文件
node1
server.port=8081 spring.cloud.consul.discovery.enabled=true spring.cloud.consul.discovery.register=true spring.application.name=leader-election-example spring.cloud.consul.host=localhost spring.cloud.consul.port=8500 spring.cloud.consul.discovery.instance-id=${spring.application.name}:${spring.application.instance_id:${random.value}}
node2
server.port=8082 spring.cloud.consul.discovery.enabled=true spring.cloud.consul.discovery.register=true spring.application.name=leader-election-example spring.cloud.consul.host=localhost spring.cloud.consul.port=8500 spring.cloud.consul.discovery.instance-id=${spring.application.name}:${spring.application.instance_id:${random.value}}
以上只是一些關鍵代碼。
4.測試
啟動node1節(jié)點
java -jar myapp.jar --spring.profiles.active=node1
啟動node2節(jié)點
java -jar myapp.jar --spring.profiles.active=node2
通過控制臺觀察日志,其中只有一臺機器能選為主機
以上就是Spring Cloud Consul實現(xiàn)選舉機制的代碼工程的詳細內容,更多關于Spring Cloud Consul選舉機制的資料請關注腳本之家其它相關文章!
相關文章
java并發(fā)容器ConcurrentHashMap深入分析
這篇文章主要為大家介紹了java并發(fā)容器ConcurrentHashMap使用示例及深入分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05Java編程實現(xiàn)對十六進制字符串異或運算代碼示例
這篇文章主要介紹了Java編程實現(xiàn)對十六進制字符串異或運算代碼示例,簡述了異或運算以及具體實例,具有一定借鑒價值,需要的朋友可以參考下。2017-12-12Java 多線程并發(fā)AbstractQueuedSynchronizer詳情
這篇文章主要介紹了Java 多線程并發(fā)AbstractQueuedSynchronizer詳情,文章圍繞主題展開想象的內容介紹,具有一定的參考價值,感興趣的小伙伴可以參考一下2022-06-06