# Resilience4j ThreadPoolBulkhead 租户上下文跨线程传播问题排查指南 > **适用场景**:Spring Cloud + OpenFeign + Resilience4j(ThreadPoolBulkhead)+ TransmittableThreadLocal(TTL) --- ## 1. 问题现象 ### 1.1 典型日志特征 ``` # HTTP 线程正确设置了租户上下文 [nio-9301-exec-3] GlobalContextFilter : 租户上下文已设置: tenantId=5 # Feign 调用线程(线程池线程)却读取到错误的租户 ID [pool-5-thread-1] OAuthRequestInterceptor : Feign 透传租户 ID: tenantId=4 # 后续请求无论 X-Tenant-Id 是多少,线程池线程始终返回第一次的 tenantId=4 [nio-9301-exec-4] GlobalContextFilter : 租户上下文已设置: tenantId=51 [pool-5-thread-1] OAuthRequestInterceptor : Feign 透传租户 ID: tenantId=4 ← 仍然是 4! ``` ### 1.2 核心特征 | 现象 | 说明 | |------|------| | HTTP 线程上下文正确 | `TenantContextHolder.getTenantId()` 在 Controller/Filter 中返回正确值 | | 线程池线程上下文错误 | Feign 拦截器或 Service 中读取到旧值或 `null` | | 旧值具有"粘性" | 线程池线程复用后,始终残留第一次被创建时的上下文值 | | 与请求头不一致 | 请求头 `X-Tenant-Id` 变化,但业务线程读取的值不变 | --- ## 2. 问题根因 ### 2.1 架构背景 本项目使用以下技术栈: - **租户上下文**:`TenantContextHolder` 基于 `TransmittableThreadLocal`(TTL)实现 - **服务间调用**:OpenFeign + Spring Cloud LoadBalancer - **熔断隔离**:Spring Cloud Circuit Breaker + Resilience4j `ThreadPoolBulkhead` - **线程池隔离目的**:限制并发数,防止故障扩散 ### 2.2 线程切换链路(关键!) 当 Feign 调用触发 Circuit Breaker + ThreadPoolBulkhead 时,一次请求会经历 **三层线程**: ``` ┌─────────────────┐ ┌─────────────────────────┐ ┌─────────────────────────┐ │ HTTP 线程 │ │ ThreadPoolBulkhead 线程 │ │ CircuitBreakerFactory │ │ [nio-9301-exec] │ ──▶ │ [bulkhead-xxx-thread] │ ──▶ │ ExecutorService 线程 │ │ │ │ │ │ [pool-N-thread-M] │ └─────────────────┘ └─────────────────────────┘ └─────────────────────────┘ │ │ │ │ ① TTL 自动透传 │ ② ContextPropagator 恢复 │ ③ ??? 上下文丢失 │ (原生 ThreadLocal │ (Resilience4j 官方机制) │ │ 不跨线程池) │ │ │ │ │ tenantId=5 tenantId=5 ✓ tenantId=4 ✗ ``` ### 2.3 为什么 ContextPropagator 不够? Resilience4j 提供了 `ContextPropagator` 接口,官方设计目的是在 **ThreadPoolBulkhead 线程池** 内透传上下文: - `retrieve()`:在调用方线程捕获上下文值 - `copy()`:在线程池线程恢复上下文值 - `clear()`:在线程池线程清理上下文值 **但 Spring Cloud Circuit Breaker 内部还有一层线程池!** 查看 `Resilience4JCircuitBreaker.run()` 源码: ```java if (executorService != null) { // ① 先把任务提交到工厂自己的 ExecutorService(newCachedThreadPool) Supplier> futureSupplier = () -> executorService.submit(toRun::get); // ② 再用 ThreadPoolBulkhead 包装 Future 等待逻辑 Callable bulkheadCall = bulkheadProvider.decorateCallable(..., timeLimitedCall); ... } ``` **执行流程变成了:** 1. HTTP 线程提交任务 → `ThreadPoolBulkhead` 线程池 2. `ThreadPoolBulkhead` 线程执行 `ContextPropagator.copy()` → 恢复 `tenantId=5` 3. `ThreadPoolBulkhead` 线程调用 `executorService.submit(toRun::get)` → 提交到 **第二个线程池** 4. `executorService` 线程(`pool-5-thread-1`)执行 Feign 调用 5. `executorService` 线程 **没有** 经过 `ContextPropagator` 恢复,其 `ThreadLocal` 为 `null` 6. 但由于 `TransmittableThreadLocal` 继承 `InheritableThreadLocal`,线程创建时可能继承了父线程的值,且 **永不清理**,导致旧值残留 ### 2.4 为什么旧值有"粘性"? `Resilience4JCircuitBreakerFactory` 默认使用 `Executors.newCachedThreadPool()`: - 线程创建时继承父线程(`ThreadPoolBulkhead` 线程)的 `InheritableThreadLocal` - 线程被缓存复用,永不销毁(空闲 60 秒) - **没有人清理** `executorService` 线程的 `ThreadLocal` - 因此该线程永远携带第一次被创建时的 `tenantId` --- ## 3. 排查思路(按优先级) ### Step 1:确认问题范围 检查日志中 Feign 调用所在的线程名: ``` # 如果是 ThreadPoolBulkhead 线程,命名类似: [bulkhead-xxx-1] # 如果是 Spring 默认线程池,命名类似: [pool-5-thread-1] ``` 如果看到 `[pool-N-thread-M]`,说明问题在 **第二层线程切换**。 ### Step 2:确认 ContextPropagator 是否生效 在 `TenantContextPropagator` 的 `retrieve()` / `copy()` / `clear()` 方法中加日志: ```java @Override public Supplier> retrieve() { return () -> { Long tenantId = TenantContextHolder.getTenantId(); log.info("[ContextPropagator] retrieve: tenantId={} in thread={}", tenantId, Thread.currentThread().getName()); ... }; } ``` - 如果 `retrieve()` 日志不打印 → `ContextPropagator` 未被注册到 ThreadPoolBulkheadConfig - 如果 `copy()` 打印的线程名是 `[pool-N-thread-M]` → `ContextPropagator` 被用在了错误的线程池上(本不应出现) ### Step 3:确认是否存在多层线程池 在 `OAuthRequestInterceptor` 中加日志: ```java Long tenantId = TenantContextHolder.getTenantId(); log.info("[Feign] 当前线程={}, tenantId={}", Thread.currentThread().getName(), tenantId); ``` 对比 HTTP 线程的 `tenantId` 和 Feign 线程的 `tenantId`: | HTTP 线程 | Feign 线程 | 结论 | |-----------|-----------|------| | 5 | 5 | 正常 | | 5 | null | 上下文完全丢失 | | 5 | 4 | 旧值残留(本文档描述的问题) | | 5 | 51(上一次的值)| 线程复用且未清理 | ### Step 4:检查 ThreadPoolBulkheadConfig 配置 断点或日志打印 `ThreadPoolBulkheadConfig.getContextPropagator()`: ```java ThreadPoolBulkheadConfig config = threadPoolBulkheadRegistry.getDefaultConfig(); log.info("配置中的 ContextPropagator: {}", config.getContextPropagator()); ``` - 如果为空列表 → 配置未生效 - 如果有 `TenantContextPropagator` → 配置已生效,但只能解决第一层线程切换 --- ## 4. 完整修复方案 ### 4.1 方案概览 需要 **三个层面的修复** 协同工作: | 层面 | 修复目标 | 组件 | |------|---------|------| | 第一层 | HTTP 线程 → ThreadPoolBulkhead 线程 | `TenantContextPropagator` + `TenantContextThreadPoolBulkheadConfigCustomizer` | | 第二层 | ThreadPoolBulkhead 线程 → ExecutorService 线程 | `TtlExecutors` + `TtlResilience4JCircuitBreakerFactoryCustomizer` | | 注册层 | Feign 客户端被 Spring 正确扫描 | `META-INF/spring.factories` 注册 Feign 接口 | ### 4.2 第一层:ThreadPoolBulkhead 内上下文传播 **组件**:`TenantContextPropagator`(已存在)+ `TenantContextThreadPoolBulkheadConfigCustomizer`(新增 BeanPostProcessor) **原理**: - Resilience4j `ThreadPoolBulkhead` 内部使用 `ContextPropagator.decorateSupplier()` 包装任务 - 在任务提交时 `retrieve()` 捕获上下文,在线程池线程执行前 `copy()` 恢复,执行后 `clear()` 清理 **实现要点**: - `TenantContextThreadPoolBulkheadConfigCustomizer` 实现 `BeanPostProcessor` - 在 `ThreadPoolBulkheadRegistry` 初始化后,通过反射修改其默认配置,注入 `TenantContextPropagator` - 不能使用 `AbstractRegistry.addConfiguration("default", config)`,因为该方法禁止修改 `"default"` 键 - 必须直接修改内部的 `configurations` Map ```java public class TenantContextThreadPoolBulkheadConfigCustomizer implements BeanPostProcessor { @Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof ThreadPoolBulkheadRegistry registry) { injectContextPropagator(registry); } return bean; } @SuppressWarnings("unchecked") private void injectContextPropagator(ThreadPoolBulkheadRegistry registry) { try { Field configurationsField = AbstractRegistry.class.getDeclaredField("configurations"); configurationsField.setAccessible(true); Map configurations = (Map) configurationsField.get(registry); ThreadPoolBulkheadConfig defaultConfig = configurations.get("default"); if (defaultConfig == null || hasTenantPropagator(defaultConfig)) { return; } ThreadPoolBulkheadConfig newDefaultConfig = ThreadPoolBulkheadConfig.from(defaultConfig) .contextPropagator(TenantContextPropagator.class) .build(); configurations.put("default", newDefaultConfig); } catch (NoSuchFieldException | IllegalAccessException e) { throw new IllegalStateException("无法修改 ThreadPoolBulkheadRegistry 的默认配置", e); } } } ``` **注册为 Spring Bean**(注意用 `static` 方法避免 BeanPostProcessor 警告): ```java @Bean @ConditionalOnMissingBean public static TenantContextThreadPoolBulkheadConfigCustomizer tenantContextThreadPoolBulkheadConfigCustomizer() { return new TenantContextThreadPoolBulkheadConfigCustomizer(); } ``` ### 4.3 第二层:CircuitBreakerFactory ExecutorService 上下文传播 **组件**:`TtlResilience4JCircuitBreakerFactoryCustomizer`(新增 Customizer) **原理**: - 使用 Alibaba `TtlExecutors` 包装 `ExecutorService` - `TtlExecutors` 在 `submit()` 时自动捕获当前线程的 `TransmittableThreadLocal` 值 - 在目标线程执行前自动恢复,执行后自动清理 - 支持线程池复用场景,无旧值残留 **实现**: ```java public class TtlResilience4JCircuitBreakerFactoryCustomizer implements Customizer { @Override public void customize(Resilience4JCircuitBreakerFactory factory) { factory.configureExecutorService( TtlExecutors.getTtlExecutorService(Executors.newCachedThreadPool()) ); factory.configureGroupExecutorService( group -> TtlExecutors.getTtlExecutorService(Executors.newCachedThreadPool()) ); } } ``` **注册为 Spring Bean**: ```java @Bean @ConditionalOnMissingBean public static TtlResilience4JCircuitBreakerFactoryCustomizer ttlResilience4JCircuitBreakerFactoryCustomizer() { return new TtlResilience4JCircuitBreakerFactoryCustomizer(); } ``` ### 4.4 第三层:Feign 客户端注册 **组件**:`META-INF/spring.factories` **原理**: - 本项目使用自定义的 `CustomFeignClientsRegistrar` 注册 Feign 客户端 - `CustomFeignClientsRegistrar` 从 `SpringFactoriesLoader.loadFactoryNames(CloudFeignAutoConfiguration.class, classLoader)` 加载 Feign 接口类名 - 如果 Feign 接口未在 `spring.factories` 中注册,Spring 容器中不会出现该 Bean,导致 `NoSuchBeanDefinitionException` **实现**: 在定义 Feign 接口的模块(如 `rui-common-security`)新增 `META-INF/spring.factories`: ```properties com.rui.common.feign.CloudFeignAutoConfiguration=\ com.rui.common.security.feign.TokenManageFeign ``` ### 4.5 模块依赖关系 确保 `rui-common-security` 添加 `rui-common-feign` 依赖: ```xml com.rui rui-common-feign ``` 否则 `spring.factories` 中引用的 `CloudFeignAutoConfiguration` 类在编译期不可见。 --- ## 5. 验证方法 ### 5.1 本地验证 1. 启动 `rui-auth` 和 `rui-service-system` 2. 发送第一次请求:`X-Tenant-Id: 5` 3. 观察 Feign 调用日志,确认 `tenantId=5` 4. 发送第二次请求:`X-Tenant-Id: 51` 5. 观察 Feign 调用日志,确认 `tenantId=51`(不是 5) 6. 发送第三次请求:`X-Tenant-Id: 3` 7. 观察 Feign 调用日志,确认 `tenantId=3`(不是 5 也不是 51) ### 5.2 关键日志断言 ```java // 断言:HTTP 线程和 Feign 线程的 tenantId 必须一致 assertEquals("HTTP 线程和 Feign 线程的租户 ID 必须一致", httpTenantId, feignTenantId); // 断言:每次请求的 tenantId 必须不同(如果请求头不同) assertNotEquals("线程池线程不应残留旧租户 ID", previousTenantId, currentTenantId); ``` ### 5.3 断点验证 在以下位置打断点,单步跟踪: 1. `TenantContextPropagator.retrieve()` — 确认每次请求捕获的值不同 2. `TenantContextPropagator.copy()` — 确认在线程池线程恢复的值正确 3. `TtlExecutors` 内部 — 确认 `executorService` 线程恢复的值正确 4. `OAuthRequestInterceptor` — 确认最终 Feign 调用时的值正确 --- ## 6. 常见问题 FAQ ### Q1:为什么 YAML 中配置 `resilience4j.thread-pool-bulkhead.configs.default.contextPropagators` 不生效? **A**:该配置依赖 Spring Boot `ConfigurationProperties` 绑定 `Class[]` 类型。虽然 `CommonThreadPoolBulkheadConfigurationProperties` 支持该属性,但: 1. Spring Cloud Circuit Breaker 动态创建的 Bulkhead 实例名不确定 2. `CompositeCustomizer` 按实例名精确匹配,不存在通配符机制 3. 因此编程式注入(`BeanPostProcessor`)更可靠 ### Q2:只用 `TtlExecutors` 不用 `ContextPropagator` 可以吗? **A**:不可以。`TtlExecutors` 只能透传标准 `ThreadPoolExecutor` 的任务提交。Resilience4j 的 `ThreadPoolBulkhead` 内部使用自己的 `ThreadPoolExecutor`,不经过 `TtlExecutors`。因此第一层切换(HTTP → ThreadPoolBulkhead)必须由 `ContextPropagator` 处理。 ### Q3:为什么 `TenantContextHolder` 使用 `TransmittableThreadLocal` 而不是普通 `ThreadLocal`? **A**:因为项目中存在 `@Async` 异步任务、Feign 线程池切换等场景。`TransmittableThreadLocal` 配合 `TtlExecutors` 可以在线程池间自动透传上下文,而普通 `ThreadLocal` 只能在线程父子间继承(且对线程池无效)。 ### Q4:如果以后引入其他线程池(如 `@Async`),是否也会遇到同样问题? **A**:是的。任何使用线程池的地方,如果任务提交方线程有 `ThreadLocal` 上下文,而执行方线程需要读取该上下文,都必须使用以下方案之一: - 用 `TtlExecutors` 包装线程池(推荐) - 手动在任务提交前捕获上下文,在任务执行前恢复(类似 `ContextPropagator` 原理) - 使用 Project Reactor 的 `Context` + `Hooks.onEachOperator`(响应式场景) **最佳实践**:所有业务线程池统一通过 `TtlExecutors` 包装。 ### Q5:如果关闭 ThreadPoolBulkhead,改用 SemaphoreBulkhead,能否避免此问题? **A**:可以。SemaphoreBulkhead 在同一线程内执行,不存在线程切换。但会牺牲线程隔离的故障保护能力。 配置方式: ```yaml spring: cloud: circuitbreaker: resilience4j: enableSemaphoreDefaultBulkhead: true ``` --- ## 7. 后续维护建议 1. **新增 Feign 客户端时**:务必在所在模块的 `META-INF/spring.factories` 中注册 2. **新增线程池时**:优先使用 `TtlExecutors.getTtlExecutorService()` 包装 3. **新增 ThreadLocal 上下文时**:考虑是否需要配套 `ContextPropagator` 4. **日志规范**:在上下文切换关键点(Filter、Interceptor、线程池任务)打印 `tenantId` + `threadName`,便于快速定位问题 5. **自动化测试**:编写并发测试,模拟多租户同时请求,断言各线程的 `tenantId` 与请求头一致 --- ## 8. 相关代码文件 | 文件 | 作用 | |------|------| | `rui-common-core/holder/TenantContextHolder.java` | 租户上下文持有者(TransmittableThreadLocal) | | `rui-common-security/feign/TokenManageFeign.java` | Feign 客户端示例 | | `rui-common-security/feign/OAuthRequestInterceptor.java` | Feign 请求拦截器(透传租户 ID) | | `rui-common-feign/propagator/TenantContextPropagator.java` | Resilience4j ContextPropagator 实现 | | `rui-common-feign/config/TenantContextThreadPoolBulkheadConfigCustomizer.java` | 注入 ContextPropagator 到 ThreadPoolBulkheadRegistry | | `rui-common-feign/config/TtlResilience4JCircuitBreakerFactoryCustomizer.java` | 用 TtlExecutors 包装 CircuitBreakerFactory 线程池 | | `rui-common-feign/CloudFeignAutoConfiguration.java` | 注册上述 Bean | | `rui-common-security/META-INF/spring.factories` | 注册 Feign 客户端 |