spring源碼真正核心點(集成源碼深度剖析)
2023-04-19 01:19:19 2
Fescar 簡介常見的分布式事務方式有基於 2PC 的 XA (e.g. atomikos),從業務層入手的 TCC( e.g. byteTCC)、事務消息 ( e.g. RocketMQ Half Message) 等等。XA 是需要本地資料庫支持的分布式事務的協議,資源鎖在資料庫層面導致性能較差,而支付寶作為布道師引入的 TCC 模式需要大量的業務代碼保證,開發維護成本較高。
分布式事務是業界比較關注的領域,這也是短短時間 Fescar 能收穫6k Star的原因之一。Fescar 名字取自 Fast & Easy Commit And Rollback ,簡單來說Fescar通過對本地 RDBMS 分支事務的協調來驅動完成全局事務,是工作在應用層的中間件。主要優點是相對於XA模式是性能較好不長時間佔用連接資源,相對於 TCC 方式開發成本和業務侵入性較低。
類似於 XA,Fescar 將角色分為 TC、RM、TM,事務整體過程模型如下:

1. TM 向 TC 申請開啟一個全局事務,全局事務創建成功並生成一個全局唯一的 XID。2. XID 在微服務調用鏈路的上下文中傳播。3. RM 向 TC 註冊分支事務,將其納入 XID 對應全局事務的管轄。4. TM 向 TC 發起針對 XID 的全局提交或回滾決議。5. TC 調度 XID 下管轄的全部分支事務完成提交或回滾請求。
其中在目前的實現版本中 TC 是獨立部署的進程,維護全局事務的操作記錄和全局鎖記錄,負責協調並驅動全局事務的提交或回滾。TM RM 則與應用程式工作在同一應用進程。RM對 JDBC 數據源採用代理的方式對底層資料庫做管理,利用語法解析,在執行事務保留快照,並生成 undo log。大概的流程和模型劃分就介紹到這裡,下面開始對 Fescar 事務傳播機制的分析。
Fescar 事務傳播機制Fescar 事務傳播包括應用內事務嵌套調用和跨服務調用的事務傳播。Fescar 事務是怎麼在微服務調用鏈中傳播的呢?Fescar 提供了事務 API 允許用戶手動綁定事務的 XID 並加入到全局事務中,所以我們根據不同的服務框架機制,將 XID 在鏈路中傳遞即可實現事務的傳播。
RPC 請求過程分為調用方與被調用方兩部分,我們將 XID 在請求與響應時做相應的處理即可。大致過程為:調用方即請求方將當前事務上下文中的 XID 取出,通過RPC協議傳遞給被調用方;被調用方從請求中的將 XID 取出,並綁定到自己的事務上下文中,納入全局事務。微服務框架一般都有相應的 Filter 和 Interceptor 機制,我們來分析下 Spring Cloud 與Fescar 的整合過程。
Fescar 與 Spring Cloud Alibaba 集成部分源碼解析本部分源碼全部來自於 spring-cloud-alibaba-fescar. 源碼解析部分主要包括AutoConfiguration、微服務被調用方和微服務調用方三大部分。對於微服務調用方方式具體分為 RestTemplate 和 Feign,對於 Feign 請求方式又進一步細分為結合 Hystrix 和 Sentinel 的使用模式。
Fescar AutoConfiguration對於 AutoConfiguration 部分的解析此處只介紹與 Fescar 啟動相關的部分,其他部分的解析將穿插於【微服務被調用方】和【微服務調用方】章節進行介紹。
Fescar 的啟動需要配置 GlobalTransactionScanner,GlobalTransactionScanner 負責初始化 Fescar 的 RM client、TM client 和 自動代理標註 GlobalTransactional 註解的類。
GlobalTransactionScanner bean 的啟動通過 GlobalTransactionAutoConfiguration 加載並注入FescarProperties。
FescarProperties 包含了 Fescar的重要屬性 txServiceGroup ,此屬性的可通過 application.properties 文件中的 key: spring.cloud.alibaba.fescar.txServiceGroup 讀取,默認值為 ${spring.application.name}-fescar-service-group 。txServiceGroup 表示Fescar 的邏輯事務分組名,此分組名通過配置中心(目前支持文件、Apollo)獲取邏輯事務分組名對應的 TC 集群名稱,進一步通過集群名稱構造出 TC 集群的服務名,通過註冊中心(目前支持Nacos、Redis、ZooKeeper和Eureka)和服務名找到可用的 TC 服務節點,然後 RM client、TM client 與 TC 進行 RPC 交互。
微服務被調用方
由於調用方的邏輯比較多一點,我們先分析被調用方的邏輯。針對於 Spring Cloud 項目,默認採用的 RPC 傳輸協議時 HTTP 協議,所以使用了 HandlerInterceptor 機制來對HTTP的請求做攔截。
HandlerInterceptor 是 Spring 提供的接口, 它有以下三個方法可以被覆寫。
/** * Intercept the execution of a handler. Called after HandlerMapping determined * an appropriate handler object, but before HandlerAdapter invokes the handler. */ default boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { return true; } /** * Intercept the execution of a handler. Called after HandlerAdapter actually * invoked the handler, but before the DispatcherServlet renders the view. * Can expose additional model objects to the view via the given ModelAndView. */ default void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable ModelAndView modelAndView) throws Exception { } /** * Callback after completion of request processing, that is, after rendering * the view. Will be called on any outcome of handler execution, thus allows * for proper resource cleanup. */ default void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable Exception ex) throws Exception { }
根據注釋,我們可以很明確的看到各個方法的作用時間和常用用途。對於 Fescar 集成來講,它需要重寫了 preHandle、afterCompletion 方法。
FescarHandlerInterceptor 的作用是將服務鏈路傳遞過來的 XID,綁定到服務節點的事務上下文中,並且在請求完成後清理相關資源。FescarHandlerInterceptorConfiguration 中配置了所有的 url 均進行攔截,對所有的請求過來均會執行該攔截器,進行 XID 的轉換與事務綁定。
/** * @author xiaojing * * Fescar HandlerInterceptor, Convert Fescar information into * @see com.alibaba.fescar.core.context.RootContext from http request's header in * {@link org.springframework.web.servlet.HandlerInterceptor#preHandle(HttpServletRequest , HttpServletResponse , Object )}, * And clean up Fescar information after servlet method invocation in * {@link org.springframework.web.servlet.HandlerInterceptor#afterCompletion(HttpServletRequest, HttpServletResponse, Object, Exception)} */public class FescarHandlerInterceptor implements HandlerInterceptor { private static final Logger log = LoggerFactory .getLogger(FescarHandlerInterceptor.class); @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String xid = RootContext.getXID; String rpcXid = request.getHeader(RootContext.KEY_XID); if (log.isdebugEnabled) { log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid); } if (xid == null && rpcXid != null) { RootContext.bind(rpcXid); if (log.isDebugEnabled) { log.debug("bind {} to RootContext", rpcXid); } } return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception e) throws Exception { String rpcXid = request.getHeader(RootContext.KEY_XID); if (StringUtils.isEmpty(rpcXid)) { return; } String unbindXid = RootContext.unbind; if (log.isDebugEnabled) { log.debug("unbind {} from RootContext", unbindXid); } if (!rpcXid.equalsIgnoreCase(unbindXid)) { log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid); if (unbindXid != null) { RootContext.bind(unbindXid); log.warn("bind {} back to RootContext", unbindXid); } } }}
preHandle 在請求執行前被調用,xid 為當前事務上下文已經綁定的全局事務的唯一標識,rpcXid 為請求通過 HTTP Header 傳遞過來需要綁定的全局事務標識。preHandle 方法中判斷如果當前事務上下文中沒有 XID,且 rpcXid 不為空,那麼就將 rpcXid 綁定到當前的事務上下文。
afterCompletion 在請求完成後被調用,該方法用來執行資源的相關清理動作。Fescar 通過 RootContext.unbind 方法對事務上下文涉及到的 XID 進行解綁。下面 if 中的邏輯是為了代碼的健壯性考慮,如果遇到 rpcXid和 unbindXid 不相等的情況,再將 unbindXid 重新綁定回去。
對於 Spring Cloud 來講,默認採用的 RPC 方式是 HTTP 的方式,所以對被調用方來講,它的請求攔截方式不用做任何區分,只需要從 Header 中將 XID 就可以取出綁定到自己的事務上下文中即可。但是對於調用方由於請求組件的多樣化,包括熔斷隔離機制,所以要區分不同的情況做處理,後面我們來具體分析一下。
微服務調用方
Fescar 將請求方式分為:RestTemplate、Feign、Feign Hystrix 和 Feign Sentinel 。不同的組件通過 Spring Boot 的 Auto Configuration 來完成自動的配置,具體的配置類可以看 spring.factories ,下文也會介紹相關的配置類。
RestTemplate
先來看下如果調用方如果是是基於 RestTemplate 的請求,Fescar 是怎麼傳遞 XID 的。
public class FescarRestTemplateInterceptor implements ClientHttpRequestInterceptor { @Override public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException { HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest); String xid = RootContext.getXID; if (!StringUtils.isEmpty(xid)) { requestWrapper.getHeaders.add(RootContext.KEY_XID, xid); } return clientHttpRequestExecution.execute(requestWrapper, bytes); }}
FescarRestTemplateInterceptor 實現了 ClientHttpRequestInterceptor 接口的 intercept 方法,對調用的請求做了包裝,在發送請求時若存在 Fescar 事務上下文 XID 則取出並放到 HTTP Header 中。
FescarRestTemplateInterceptor 通過 FescarRestTemplateAutoConfiguration 實現將 FescarRestTemplateInterceptor 配置到 RestTemplate 中去。
@Configurationpublic class FescarRestTemplateAutoConfiguration { @Bean public FescarRestTemplateInterceptor fescarRestTemplateInterceptor { return new FescarRestTemplateInterceptor; } @Autowired(required = false) private Collection restTemplates; @Autowired private FescarRestTemplateInterceptor fescarRestTemplateInterceptor; @PostConstruct public void init { if (this.restTemplates != null) { for (RestTemplate restTemplate : restTemplates) { List interceptors = new ArrayList( restTemplate.getInterceptors); interceptors.add(this.fescarRestTemplateInterceptor); restTemplate.setInterceptors(interceptors); } } }}
init 方法遍歷所有的 restTemplate ,並將原來 restTemplate 中的攔截器取出,增加 fescarRestTemplateInterceptor 後置入並重排序。
Feign

接下來看下 Feign 的相關代碼,該包下面的類還是比較多的,我們先從其 AutoConfiguration 入手。
@Configuration@ConditionalOnClass(Client.class)@AutoConfigureBefore(FeignAutoConfiguration.class)public class FescarFeignClientAutoConfiguration { @Bean @Scope("prototype") @ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand") @ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true") Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) { return FescarHystrixFeignBuilder.builder(beanFactory); } @Bean @Scope("prototype") @ConditionalOnClass(name = "com.alibaba.csp.sentinel.SphU") @ConditionalOnProperty(name = "feign.sentinel.enabled", havingValue = "true") Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) { return FescarSentinelFeignBuilder.builder(beanFactory); } @Bean @ConditionalOnMissingBean @Scope("prototype") Feign.Builder feignBuilder(BeanFactory beanFactory) { return FescarFeignBuilder.builder(beanFactory); } @Configuration protected static class FeignBeanPostProcessorConfiguration { @Bean FescarBeanPostProcessor fescarBeanPostProcessor( FescarFeignObjectWrapper fescarFeignObjectWrapper) { return new FescarBeanPostProcessor(fescarFeignObjectWrapper); } @Bean FescarContextBeanPostProcessor fescarContextBeanPostProcessor( BeanFactory beanFactory) { return new FescarContextBeanPostProcessor(beanFactory); } @Bean FescarFeignObjectWrapper fescarFeignObjectWrapper(BeanFactory beanFactory) { return new FescarFeignObjectWrapper(beanFactory); } }}
FescarFeignClientAutoConfiguration 在存在 Client.class 時生效,且要求作用在 FeignAutoConfiguration 之前。由於FeignClientsConfiguration 是在 FeignAutoConfiguration 生成 FeignContext 生效的,所以根據依賴關係, FescarFeignClientAutoConfiguration 同樣早於 FeignClientsConfiguration。
FescarFeignClientAutoConfiguration 自定義了 Feign.Builder,針對於 feign.sentinel,feign.hystrix 和 feign 的情況做了適配,目的是自定義 feign 中 Client 的真正實現為 FescarFeignClient。
HystrixFeign.builder.retryer(Retryer.NEVER_RETRY) .client(new FescarFeignClient(beanFactory))SentinelFeign.builder.retryer(Retryer.NEVER_RETRY) .client(new FescarFeignClient(beanFactory));Feign.builder.client(new FescarFeignClient(beanFactory));
FescarFeignClient 是對原來的 Feign 客戶端代理增強,具體代碼見下圖:
public class FescarFeignClient implements Client { private final Client delegate; private final BeanFactory beanFactory; FescarFeignClient(BeanFactory beanFactory) { this.beanFactory = beanFactory; this.delegate = new Client.Default(null, null); } FescarFeignClient(BeanFactory beanFactory, Client delegate) { this.delegate = delegate; this.beanFactory = beanFactory; } @Override public Response execute(Request request, Request.Options options) throws IOException { Request modifiedRequest = getModifyRequest(request); try { return this.delegate.execute(modifiedRequest, options); } finally { } } private Request getModifyRequest(Request request) { String xid = RootContext.getXID; if (StringUtils.isEmpty(xid)) { return request; } Map<String, Collection> headers = new HashMap; headers.putAll(request.headers); List fescarXid = new ArrayList; fescarXid.add(xid); headers.put(RootContext.KEY_XID, fescarXid); return Request.create(request.method, request.url, headers, request.body, request.charset); }
上面的過程中我們可以看到,FescarFeignClient 對原來的 Request 做了修改,它首先將 XID 從當前的事務上下文中取出,如果 XID 不為空的情況下,將 XID 放到了 Header 中。
FeignBeanPostProcessorConfiguration 定義了3個bean:FescarContextBeanPostProcessor、FescarBeanPostProcessor 和 FescarFeignObjectWrapper。其中 FescarContextBeanPostProcessor FescarBeanPostProcessor 實現了Spring BeanPostProcessor 接口。
以下為 FescarContextBeanPostProcessor 實現。
@Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof FeignContext && !(bean instanceof FescarFeignContext)) { return new FescarFeignContext(getFescarFeignObjectWrapper, (FeignContext) bean); } return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; }
BeanPostProcessor 中的兩個方法可以對 Spring 容器中的 Bean 做前後處理,postProcessBeforeInitialization 處理時機是初始化之前,postProcessAfterInitialization 的處理時機是初始化之後,這2個方法的返回值可以是原先生成的實例 bean,或者使用 wrapper 包裝後的實例。
FescarContextBeanPostProcessor 將 FeignContext 包裝成 FescarFeignContext。
FescarBeanPostProcessor 將 FeignClient 根據是否繼承了LoadBalancerFeignClient 包裝成 FescarLoadBalancerFeignClient 和 FescarFeignClient。
FeignAutoConfiguration 中的 FeignContext 並沒有加 ConditionalOnXXX 的條件,所以 Fescar 採用預置處理的方式將 FeignContext 包裝成 FescarFeignContext。
@Bean public FeignContext feignContext { FeignContext context = new FeignContext; context.setConfigurations(this.configurations); return context; }
而對於 Feign Client,FeignClientFactoryBean 中會獲取 FeignContext 的實例對象。對於開發者採用 @Configuration 註解的自定義配置的 Feign Client 對象,這裡會被配置到 builder,導致 FescarFeignBuilder 中增強後的 FescarFeignCliet 失效。FeignClientFactoryBean 中關鍵代碼如下:
/** * @param the target type of the Feign client * @return a {@link Feign} client created with the specified data and the context information */ T getTarget { FeignContext context = applicationContext.getBean(FeignContext.class); Feign.Builder builder = feign(context); if (!StringUtils.hasText(this.url)) { if (!this.name.startsWith("http")) { url = "http://" this.name; } else { url = this.name; } url = cleanPath; return (T) loadBalance(builder, context, new HardCodedTarget(this.type, this.name, url)); } if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) { this.url = "http://" this.url; } String url = this.url cleanPath; Client client = getOptional(context, Client.class); if (client != null) { if (client instanceof LoadBalancerFeignClient) { // not load balancing because we have a url, // but ribbon is on the classpath, so unwrap client = ((LoadBalancerFeignClient)client).getDelegate; } builder.client(client); } Targeter targeter = get(context, Targeter.class); return (T) targeter.target(this, builder, context, new HardCodedTarget( this.type, this.name, url)); }
上述代碼根據是否指定了註解參數中的 URL 來選擇直接調用 URL 還是走負載均衡,targeter.target 通過動態代理創建對象。大致過程為:將解析出的feign方法放入map,再通過將其作為參數傳入生成InvocationHandler,進而生成動態代理對象。
FescarContextBeanPostProcessor 的存在,即使開發者對 FeignClient 自定義操作,依舊可以完成 Fescar 所需的全局事務的增強。
對於 FescarFeignObjectWrapper,我們重點關注下Wrapper方法:
Object wrap(Object bean) { if (bean instanceof Client && !(bean instanceof FescarFeignClient)) { if (bean instanceof LoadBalancerFeignClient) { LoadBalancerFeignClient client = ((LoadBalancerFeignClient) bean); return new FescarLoadBalancerFeignClient(client.getDelegate, factory, clientFactory, this.beanFactory); } return new FescarFeignClient(this.beanFactory, (Client) bean); } return bean; }
wrap 方法中,如果 bean 是 LoadBalancerFeignClient 的實例對象,那麼首先通過 client.getDelegate 方法將 LoadBalancerFeignClient 代理的實際 Client 對象取出後包裝成 FescarFeignClient,再生成 LoadBalancerFeignClient 的子類 FescarLoadBalancerFeignClient 對象。如果 bean 是 Client 的實例對象且不是 FescarFeignClient LoadBalancerFeignClient,那麼 bean 會直接包裝生成 FescarFeignClient。
上面的流程設計還是比較巧妙的,首先根據 Spring boot 的 Auto Configuration 控制了配置的先後順序,同時自定義了 Feign Builder的Bean,保證了 Client 均是經過增強後的 FescarFeignClient 。再通過 BeanPostProcessor 對Spring 容器中的 Bean 做了一遍包裝,保證容器內的Bean均是增強後 FescarFeignClient ,避免 FeignClientFactoryBean getTarget 方法的替換動作。
Hystrix 隔離
下面我們再來看下 Hystrix 部分,為什麼要單獨把 Hystrix 拆出來看呢,而且 Fescar 代碼也單獨實現了個策略類。目前事務上下文 RootContext 的默認實現是基於 ThreadLocal 方式的 ThreadLocalContextCore,也就是上下文其實是和線程綁定的。Hystrix 本身有兩種隔離狀態的模式,基於信號量或者基於線程池進行隔離。Hystrix 官方建議是採取線程池的方式來充分隔離,也是一般情況下在採用的模式:
Thread or SemaphoreThe default, and the recommended setting, is to run HystrixCommands using thread isolation (THREAD) and HystrixObservableCommands using semaphore isolation (SEMAPHORE).Commands executed in threads have an extra layer of protection against latencies beyond what network timeouts can offer.Generally the only time you should use semaphore isolation for HystrixCommands is when the call is so high volume (hundreds per second, per instance) that the overhead of separate threads is too high; this typically only applies to non-network calls.
service 層的業務代碼和請求發出的線程肯定不是同一個,那麼 ThreadLocal 的方式就沒辦法將 XID 傳遞給 Hystrix 的線程並傳遞給被調用方的。怎麼處理這件事情呢,Hystrix 提供了個機制讓開發者去自定義並發策略,只需要繼承 HystrixConcurrencyStrategy 重寫 wrapCallable 方法即可。
public class FescarHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { private HystrixConcurrencyStrategy delegate; public FescarHystrixConcurrencyStrategy { this.delegate = HystrixPlugins.getInstance.getConcurrencyStrategy; HystrixPlugins.reset; HystrixPlugins.getInstance.registerConcurrencyStrategy(this); } @Override public Callable wrapCallable(Callable c) { if (c instanceof FescarContextCallable) { return c; } Callable wrappedCallable; if (this.delegate != null) { wrappedCallable = this.delegate.wrapCallable(c); } else { wrappedCallable = c; } if (wrappedCallable instanceof FescarContextCallable) { return wrappedCallable; } return new FescarContextCallable(wrappedCallable); } private static class FescarContextCallable implements Callable { private final Callable actual; private final String xid; FescarContextCallable(Callable actual) { this.actual = actual; this.xid = RootContext.getXID; } @Override public K call throws Exception { try { RootContext.bind(xid); return actual.call; } finally { RootContext.unbind; } } }}
Fescar 也提供一個 FescarHystrixAutoConfiguration,在存在 HystrixCommand 的時候生成FescarHystrixConcurrencyStrategy。
@Configuration@ConditionalOnClass(HystrixCommand.class)public class FescarHystrixAutoConfiguration { @Bean FescarHystrixConcurrencyStrategy fescarHystrixConcurrencyStrategy { return new FescarHystrixConcurrencyStrategy; }}
參考資料Fescar: https://github.com/alibaba/fescarSpring Cloud Alibaba: https://github.com/spring-cloud-incubator/spring-cloud-alibabaspring-cloud-openfeign: https://github.com/spring-cloud/spring-cloud-openfeign本文作者郭樹抗,社區暱稱 ywind,曾就職於華為終端雲,現搜狐智能媒體中心Java工程師,目前主要負責搜狐號相關開發,對分布式事務、分布式系統和微服務架構有異常濃厚的興趣。
季敏(清銘),社區暱稱 slievrly,Fescar 開源項目負責人,阿里巴巴中件間 TXC/GTS 核心研發成員,長期從事於分布式中間件核心研發工作,在分布式事務領域有著較豐富的技術積累。
延伸閱讀微服務架構下,解決數據一致性問題的實踐
https://mp.weixin.qq.com/s
作者:中間件小哥
,






![2022愛方向和生日是在[質量個性]中](http://img.xinsiji.cc/20220215/1604989894118215680.jpg)



