註冊中心zookeeper的安裝以及配置(zookeeper之註冊中心)
2023-04-15 23:01:39 2
zookeeper註冊中心實現原理服務註冊:
springboot項目啟動時,自定義監聽器ApplicationListener去監聽web服務啟動事件web server啟動成功,則觸發事件回調方法回調方法中,在Zookeeper指定節點下創建臨時節點,臨時節點的值保存當前項目啟動的 ip port如果某個服務宕機,服務斷開一定時間(默認30s)臨時節點會自動刪除服務發現:
springboot項目啟動時,會從zookeeper指定節點獲取對應服務的所有可用url列表(可以緩存此url列表)然後根據負載均衡算法,將請求負載到url列表中的某一個server上利用spring初始化器擴展機制創建zookeeper節點監聽,當節點列表發生變更,則更新url列表緩存服務註冊大體流程服務註冊,等對應的service的容器啟動成功,針對微服務項目,一般是spring boot內置的tomcat啟動成功,這個服務才可以使用,這個時候才可以將服務註冊到zookeeper中。
那麼如何知道tomcat容器啟動成功了呢?
通過spring的事件監聽機制,當tomcat啟動成功會發布一個事件,我們可以監聽這個事件,當tomcat啟動成功做出相應。
Spring事件監聽機制創建springboot項目order-service
(1) pom.xmlorg.springframework.boot spring-boot-starter-parent 2.6.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.apache.curator curator-recipes 5.2.1
(2) 配置文件application.propertiesserver.ip=192.168.9.1server.port=9090# 自定義的配置信息zk.service-name=order-servicezk.server=192.168.1.104:2181
(3) 創建監聽器 ApplicationListener監聽spring web伺服器已經初始化完成事件 WebServerInitializedEvent
public class ZkApplicationListener implements ApplicationListener { @Override public void onApplicationEvent(WebServerInitializedEvent event) { System.out.println("事件監聽機制的回調..."); // 獲取app.properties配置屬性 Environment environment = event.getApplicationContext.getEnvironment; String serviceName = environment.getProperty("zk.service-name"); String ip = environment.getProperty("server.ip"); String port = environment.getProperty("server.port"); String zkServer = environment.getProperty("zk.server"); // 服務註冊 ServiceRegistry zookeeperServiceRegistry = new ZookeeperServiceRegistry(serviceName,ip,port,zkServer); zookeeperServiceRegistry.register; }}
(4) SPI配置spring.factories# Application Listenersorg.springframework.context.ApplicationListener=\ com.zk.serviceregistry.orderservice.listener.ZkApplicationListener
(5) 註冊服務到zookeeper// spring cloud 團隊提供了服務註冊的接口public interface ServiceRegistry { void register;}
public class ZookeeperServiceRegistry implements ServiceRegistry { private CuratorFramework curatorFramework; private final String ip; private final String port; private final String serviceName; private final String basePath = "/zk-registry"; public ZookeeperServiceRegistry(String serviceName, String ip, String port, String zkServer) { this.serviceName = serviceName; this.ip = ip; this.port = port; this.curatorFramework = CuratorFrameworkFactory .builder .connectionTimeoutMs(20000) .connectString(zkServer) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build; curatorFramework.start; } @Override public void register { // 服務名稱 String serviceNamePath = basePath "/" serviceName; try { if (curatorFramework.checkExists.forPath(serviceNamePath) == null) { // 創建持久化的節點,作為服務名稱 this.curatorFramework.create.creatingParentsIfNeeded.withMode(CreateMode.PERSISTENT).forPath(serviceNamePath); } String urlNode = curatorFramework.create.withMode(CreateMode.EPHEMERAL).forPath(serviceNamePath "/" ip ":" port); System.out.println("服務 " urlNode " 成功註冊到zookeeper server..."); } catch (Exception e) { e.printStackTrace; } }}
(6) 啟動服務測試會發現服務註冊已經生效,日誌中列印127.0.0.1:9090已經註冊到zookeeper server
查看zookeeper,發現創建了新的節點
啟動多個服務192.168.9.1:9091,192.168.9.1:9092,192.168.9.1:9093,192.168.9.1:9094,新的服務ip:port也會被依次註冊到zookeeper中
停掉某個服務比如192.168.9.1:9094去模擬某個服務宕機的情況,當zookeeper server在一定時間內(默認30s)沒有收到來自192.168.9.1:9094服務的反饋時,就會認為此服務已經掛了,會將此服務從zookeeper節點中刪除
創建springboot項目user-service
(1) pom.xmlorg.springframework.boot spring-boot-starter-parent 2.6.8 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.apache.curator curator-recipes 5.2.1
(2) 配置文件application.propertiesserver.port=9999zk.server=192.168.1.104:2181
(3) SPI配置spring.factoriesorg.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.zk.servicediscovery.userservice.config.ZookeeperDiscoveryAutoConfiguration
(4) 自動配置,項目啟動時去執行服務發現@Configurationpublic class ZookeeperDiscoveryAutoConfiguration { @Resource private Environment environment; @Bean public ServiceDiscoveryImpl serviceDiscovery{ return new ServiceDiscoveryImpl(environment.getProperty("zk.server")); }}
(5) 服務發現與監聽public interface ServiceDiscovery { // 服務發現:獲取所有子節點(所有可用的服務url列表) List discovery(String serviceName); // 註冊監聽:當子節點發生變更(代表有新服務添加或者有服務宕機),則會觸發監聽,更新服務url列表 void registerWatch(String serviceNamePath);}
public class ServiceDiscoveryImpl implements ServiceDiscovery { private final CuratorFramework curatorFramework; private final String basePath = "/zk-registry"; public ServiceDiscoveryImpl(String zkServer) { this.curatorFramework = CuratorFrameworkFactory .builder .connectionTimeoutMs(20000) .connectString(zkServer) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build; curatorFramework.start; } @Override public List discovery(String serviceName) { // /zk-registry/order-service String serviceNamePath = basePath "/" serviceName; try { if (this.curatorFramework.checkExists.forPath(serviceNamePath) != null) { return this.curatorFramework.getChildren.forPath(serviceNamePath); } } catch (Exception e) { e.printStackTrace; } return null; } public void registerWatch(String serviceNamePath) { // 永久的監聽,當/zk-registry/order-service下的子節點變動,則更新 CuratorCache curatorCache = CuratorCache.build(curatorFramework, serviceNamePath); CuratorCacheListener listener = CuratorCacheListener.builder.forPathChildrenCache(serviceNamePath, curatorFramework, new PathChildrenCacheListener { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { // 拉模式 System.out.println("最新的urls為: " curatorFramework.getChildren.forPath(serviceNamePath)); } }).build; curatorCache.listenable.addListener(listener); curatorCache.start; }}
(6) 隨機訪問某個服務節點:模擬負載均衡public interface LoadBalance { String select(List urls);}
public class RandomLoadBalance implements LoadBalance{ @Override public String select(List urls) { int len=urls.size; Random random=new Random; return urls.get(random.nextInt(len)); }}
(7) UserController模擬請求@RestController@RequestMapping("/user")public class UserController { @Autowired private ServiceDiscovery serviceDiscovery; @RequestMapping("/discovery") public void discovery throws IOException { List urls= this.serviceDiscovery.discovery("order-service"); LoadBalance loadBalance=new RandomLoadBalance; String url = loadBalance.select(urls); System.out.println("獲取可用的order-service服務節點路徑為: " url); String response = new RestTemplate.getForObject("http://" url "/order/query", String.class); System.out.println("order-service response: " response); // 添加對節點order-service的監聽 this.serviceDiscovery.registerWatch("/zk-registry/order-service"); }}
(8) 測試訪問 http://192.168.9.1:9999/user/discovery 測試
停掉order-service某個服務節點,不需要重啟,再次訪問user-service
創建spring-cloud-zookeeper的spring boot項目,Spring Boot版本為2.6.8
(1) pom.xmlorg.springframework.boot spring-boot-starter-parent 2.6.8 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.cloud spring-cloud-starter-zookeeper-config org.springframework.cloud spring-cloud-starter-bootstrap org.springframework.cloud spring-cloud-starter-zookeeper-discovery org.springframework.cloud spring-cloud-dependencies 2021.0.3 import pom
(2) 配置文件application.properties
server.port=9091spring.cloud.zookeeper.connect-string=192.168.1.104:2181spring.cloud.zookeeper.discovery.root=/services/registriesspring.application.name=spring-cloud-zookeeper
或者application.yml
spring: cloud: zookeeper: connect-string: 192.168.1.104:2181 discovery: root: /services/registries application: name: spring-cloud-zookeeperserver: port: 9091
(3) bootstrap.yamlspring: profiles: active: dev application: name: spring-cloud-zookeeper # 找哪一個ZNode節點 spring-cloud-zookeeper-dev cloud: zookeeper: config: root: config # 相當於 /zk-config/spring-cloud-zookeeper-dev profile-separator: "-" enabled: true connect-string: 192.168.1.104:2181
(4) 啟動Spring Boot項目,觀察Zookeeper Server上的數據可以發現zookeeper server上自動創建了對應的節點
@RestControllerpublic class SpringCloudZkDiscoveryController { // 1.注入服務發現客戶端接口 @Autowired private DiscoveryClient discoveryClient; @RequestMapping("/sc-zk-discovery") public List serviceUrl { // 2.調用getInstances方法可獲得所有可用實例 List instances = discoveryClient.getInstances("spring-cloud-zookeeper"); String url = instances.get(0).getUri.toString; System.out.println("url=" url); return discoveryClient.getInstances("spring-cloud-zookeeper"); }}
訪問測試
類似於我們手動實現服務註冊,Spring Cloud也自定義了一個監聽器 AbstractAutoServiceRegistration 去監聽 web伺服器啟動事件 WebServerInitializedEvent
org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#onApplicationEvent(WebServerInitializedEvent)原始碼片段:
public abstract class AbstractAutoServiceRegistration implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener { private final ServiceRegistry serviceRegistry; // ................. public void onApplicationEvent(WebServerInitializedEvent event) { this.bind(event); } @Deprecated public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext; if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace)) { this.port.compareAndSet(0, event.getWebServer.getPort); this.start; } } public void start { if (!this.isEnabled) { } else { if (!this.running.get) { this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration)); this.register; if (this.shouldRegisterManagement) { this.registerManagement; } this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration)); this.running.compareAndSet(false, true); } } }}
(2) 服務註冊與發現getRegistration 獲取具體註冊實現類
org.springframework.cloud.zookeeper.serviceregistry.ZookeeperAutoServiceRegistration
serviceRegistry.register(registration) 具體服務註冊實現
public class ZookeeperServiceRegistry implements ServiceRegistry, SmartInitializingSingleton, Closeable { protected CuratorFramework curator; protected ZookeeperDiscoveryProperties properties; private ServiceDiscovery serviceDiscovery; public void register(ZookeeperRegistration registration) { try { this.getServiceDiscovery.registerService(registration.getServiceInstance); } catch (Exception var3) { ReflectionUtils.rethrowRuntimeException(var3); } }}
服務發現實現類:創建zookeeper節點,創建節點監聽
public class ServiceDiscoveryImpl implements ServiceDiscovery { public void registerService(ServiceInstance service) throws Exception { ServiceDiscoveryImpl.Entry newEntry = new ServiceDiscoveryImpl.Entry(service); ServiceDiscoveryImpl.Entry oldEntry = (ServiceDiscoveryImpl.Entry)this.services.putIfAbsent(service.getId, newEntry); ServiceDiscoveryImpl.Entry useEntry = oldEntry != null ? oldEntry : newEntry; synchronized(useEntry) { if (useEntry == newEntry) { // 創建節點監聽 useEntry.cache = this.makeNodeCache(service); } // 創建zookeeper節點 this.internalRegisterService(service); } } // 創建節點監聽 private CuratorCacheBridge makeNodeCache(ServiceInstance instance) { if (!this.watchInstances) { return null; } else { CuratorCacheBridge cache = CuratorCache.bridgeBuilder(this.client, this.pathForInstance(instance.getName, instance.getId)).withOptions(new Options[]{Options.SINGLE_NODE_CACHE}).withDataNotCached.build; CuratorCacheListener listener = CuratorCacheListener.builder.afterInitialized.forAll((__, ___, data) -> { if (data != null) { try { ServiceInstance newInstance = this.serializer.deserialize(data.getData); ServiceDiscoveryImpl.Entry entry = (ServiceDiscoveryImpl.Entry)this.services.get(newInstance.getId); if (entry != null) { synchronized(entry) { entry.service = newInstance; } } } catch (Exception var10) { this.log.debug("Could not deserialize: " data.getPath); } } else { this.log.warn("Instance data has been deleted for: " instance); } }).build; cache.listenable.addListener(listener); cache.start; return cache; } } // 創建zookeeper節點 @VisibleForTesting protected void internalRegisterService(ServiceInstance service) throws Exception { byte[] bytes = this.serializer.serialize(service); String path = this.pathForInstance(service.getName, service.getId); int MAX_TRIES = true; boolean isDone = false; for(int i = 0; !isDone && i webServer啟動 -> 監聽器監聽服務啟動事件執行流程:
SpringApplication.run(args) -> refreshContext(context) -> refresh(context) -> ServletWebServerApplicationContext.refresh -> AbstractApplicationContext.refresh -> finishRefresh -> DefaultLifecycleProcessor.onRefresh -> startBeans(true) -> DefaultLifecycleProcessor$LifecycleGroup.start -> doStart -> WebServerStartStopLifecycle.start -> AbstractApplicationContext.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext)) -> SimpleApplicationEventMulticaster.multicastEvent(applicationEvent, eventType) -> invokeListener(listener, event) -> doInvokeListener(listener, event) -> listener.onApplicationEvent(event);
堆棧信息:
onApplicationEvent:12, ZkApplicationListener (com.zk.serviceregistry.orderservice.listener)doInvokeListener:176, SimpleApplicationEventMulticaster (org.springframework.context.event)invokeListener:169, SimpleApplicationEventMulticaster (org.springframework.context.event)multicastEvent:143, SimpleApplicationEventMulticaster (org.springframework.context.event)publishEvent:421, AbstractApplicationContext (org.springframework.context.support)publishEvent:378, AbstractApplicationContext (org.springframework.context.support)start:46, WebServerStartStopLifecycle (org.springframework.boot.web.servlet.context)doStart:178, DefaultLifecycleProcessor (org.springframework.context.support)access$200:54, DefaultLifecycleProcessor (org.springframework.context.support)start:356, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support)accept:-1, 1643565953 (org.springframework.context.support.DefaultLifecycleProcessor$$Lambda$541)forEach:75, Iterable (java.lang)startBeans:155, DefaultLifecycleProcessor (org.springframework.context.support)onRefresh:123, DefaultLifecycleProcessor (org.springframework.context.support)finishRefresh:935, AbstractApplicationContext (org.springframework.context.support)refresh:586, AbstractApplicationContext (org.springframework.context.support)refresh:145, ServletWebServerApplicationContext (org.springframework.boot.web.servlet.context)refresh:745, SpringApplication (org.springframework.boot)refreshContext:420, SpringApplication (org.springframework.boot)run:307, SpringApplication (org.springframework.boot)run:1317, SpringApplication (org.springframework.boot)run:1306, SpringApplication (org.springframework.boot)main:10, OrderServiceApplication (com.zk.serviceregistry.orderservice)
1) SpringApplicaton.run
public class SpringApplication { public ConfigurableApplicationContext run(String... args) { long startTime = System.nanoTime; DefaultBootstrapContext bootstrapContext = this.createBootstrapContext; ConfigurableApplicationContext context = null; this.configureHeadlessProperty; SpringApplicationRunListeners listeners = this.getRunListeners(args); listeners.starting(bootstrapContext, this.mainApplicationClass); try { ApplicationArguments applicationArguments = new DefaultApplicationArguments(args); ConfigurableEnvironment environment = this.prepareEnvironment(listeners, bootstrapContext, applicationArguments); this.configureIgnoreBeanInfo(environment); Banner printedBanner = this.printBanner(environment); context = this.createApplicationContext; context.setApplicationStartup(this.applicationStartup); this.prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner); this.refreshContext(context); // 1 this.afterRefresh(context, applicationArguments); Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime - startTime); if (this.logStartupInfo) { (new StartupInfoLogger(this.mainApplicationClass)).logStarted(this.getApplicationLog, timeTakenToStartup); } listeners.started(context, timeTakenToStartup); this.callRunners(context, applicationArguments); } catch (Throwable var12) { this.handleRunFailure(context, var12, listeners); throw new IllegalStateException(var12); } try { Duration timeTakenToReady = Duration.ofNanos(System.nanoTime - startTime); listeners.ready(context, timeTakenToReady); return context; } catch (Throwable var11) { this.handleRunFailure(context, var11, (SpringApplicationRunListeners)null); throw new IllegalStateException(var11); } }}
2) refreshContext(context)
private void refreshContext(ConfigurableApplicationContext context) { if (this.registerShutdownHook) { shutdownHook.registerApplicationContext(context); } this.refresh(context); // 2 }
3) AbstractApplicationContext.refresh
public void refresh throws BeansException, IllegalStateException { synchronized(this.startupShutdownMonitor) { StartupStep contextRefresh = this.applicationStartup.start("spring.context.refresh"); this.prepareRefresh; ConfigurableListableBeanFactory beanFactory = this.obtainFreshBeanFactory; this.prepareBeanFactory(beanFactory); try { this.postProcessBeanFactory(beanFactory); StartupStep beanPostProcess = this.applicationStartup.start("spring.context.beans.post-process"); this.invokeBeanFactoryPostProcessors(beanFactory); this.registerBeanPostProcessors(beanFactory); beanPostProcess.end; this.initMessageSource; this.initApplicationEventMulticaster; this.onRefresh; this.registerListeners; this.finishBeanFactoryInitialization(beanFactory); this.finishRefresh; // 3 } catch (BeansException var10) { if (this.logger.isWarnEnabled) { this.logger.warn("Exception encountered during context initialization - cancelling refresh attempt: " var10); } this.destroyBeans; this.cancelRefresh(var10); throw var10; } finally { this.resetCommonCaches; contextRefresh.end; } } }
4) finishRefresh
protected void finishRefresh { this.clearResourceCaches; this.initLifecycleProcessor; this.getLifecycleProcessor.onRefresh; // 4 this.publishEvent((ApplicationEvent)(new ContextRefreshedEvent(this))); if (!NativeDetector.inNativeImage) { LiveBeansView.registerApplicationContext(this); } }
5) DefaultLifecycleProcessor.onRefresh
public void onRefresh { this.startBeans(true); // 5 this.running = true; }
6) startBeans(true)
private void startBeans(boolean autoStartupOnly) { Map lifecycleBeans = this.getLifecycleBeans; Map phases = new TreeMap; lifecycleBeans.forEach((beanName, bean) -> { if (!autoStartupOnly || bean instanceof SmartLifecycle && ((SmartLifecycle)bean).isAutoStartup) { int phase = this.getPhase(bean); ((DefaultLifecycleProcessor.LifecycleGroup)phases.computeIfAbsent(phase, (p) -> { return new DefaultLifecycleProcessor.LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly); })).add(beanName, bean); } }); if (!phases.isEmpty) { phases.values.forEach(DefaultLifecycleProcessor.LifecycleGroup::start); // 6 } }
7) DefaultLifecycleProcessor$LifecycleGroup.start
public void start { if (!this.members.isEmpty) { if (DefaultLifecycleProcessor.this.logger.isDebugEnabled) { DefaultLifecycleProcessor.this.logger.debug("Starting beans in phase " this.phase); } Collections.sort(this.members); Iterator var1 = this.members.iterator; while(var1.hasNext) { DefaultLifecycleProcessor.LifecycleGroupMember member = (DefaultLifecycleProcessor.LifecycleGroupMember)var1.next; DefaultLifecycleProcessor.this.doStart(this.lifecycleBeans, member.name, this.autoStartupOnly); // 7 } } }
8) DefaultLifecycleProcessor.doStart
private void doStart(Map lifecycleBeans, String beanName, boolean autoStartupOnly) { Lifecycle bean = (Lifecycle)lifecycleBeans.remove(beanName); if (bean != null && bean != this) { String[] dependenciesForBean = this.getBeanFactory.getDependenciesForBean(beanName); String[] var6 = dependenciesForBean; int var7 = dependenciesForBean.length; for(int var8 = 0; var8 < var7; var8) { String dependency = var6[var8]; this.doStart(lifecycleBeans, dependency, autoStartupOnly); } if (!bean.isRunning && (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle)bean).isAutoStartup)) { if (this.logger.isTraceEnabled) { this.logger.trace("Starting bean '" beanName "' of type [" bean.getClass.getName "]"); } try { bean.start; // 8 } catch (Throwable var10) { throw new ApplicationContextException("Failed to start bean '" beanName "'", var10); } } } }
9) 發布web服務啟動完成事件
事件 ServletWebServerInitializedEvent extends WebServerInitializedEvent
public void start { this.webServer.start; this.running = true; this.applicationContext.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext)); // 9 }
10) 發布事件 AbstractApplicationContext.publishEvent
protected void publishEvent(Object event, @Nullable ResolvableType eventType) { if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { this.getApplicationEventMulticaster.multicastEvent((ApplicationEvent)applicationEvent, eventType); // 10 } if (this.parent != null) { if (this.parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext)this.parent).publishEvent(event, eventType); } else { this.parent.publishEvent(event); } } }
11) 廣播器發布事件 SimpleApplicationEventMulticaster.multicastEvent(event, eventType)
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event); Executor executor = this.getTaskExecutor; Iterator var5 = this.getApplicationListeners(event, type).iterator; while(var5.hasNext) { ApplicationListener listener = (ApplicationListener)var5.next; if (executor != null) { executor.execute( -> { this.invokeListener(listener, event); }); } else { this.invokeListener(listener, event); // 11 } } }
12) 調用監聽器 invokeListener(listener, event)
protected void invokeListener(ApplicationListener listener, ApplicationEvent event) { ErrorHandler errorHandler = this.getErrorHandler; if (errorHandler != null) { try { this.doInvokeListener(listener, event); } catch (Throwable var5) { errorHandler.handleError(var5); } } else { this.doInvokeListener(listener, event); // 12 } } private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) { try { listener.onApplicationEvent(event); // 13 } catch (ClassCastException var6) {} }
13) 調用監聽器事件回調方法
public abstract class AbstractAutoServiceRegistration implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener { public void onApplicationEvent(WebServerInitializedEvent event) { // TODO .................. }}
,