Skip to content

SpringRetry源码解析

本文基于spring-retry:1.2.5.RELEASE

书接上文 spring-retry入门使用 , 本文主要解析由@Retryable注解到方法重试的实现流程

先有一个整体的大致印象, 如果我们要实现spring-retry这样的框架, 大致的思路肯定是通过aop(切@Retryable注解)增强方法, 在切面里try-catch异常, 然后根据重试策略(retry)以及退避策略(backoff)进行重试, 达到阈值后调用恢复方法(recover).

RetryConfiguration配置切面

  • RetryConfiguration实现了IntroductionAdvisor接口, 后者是一个引介切面, 能够对目标对象进行增强
  • 由buildPointcut看到切点是标注了@Retryable注解的类或方法
  • 切面是AnnotationAwareRetryOperationsInterceptor
java
@Configuration
public class RetryConfiguration extends AbstractPointcutAdvisor implements IntroductionAdvisor, BeanFactoryAware {

 @PostConstruct
    public void init() {
        Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet(1);
        retryableAnnotationTypes.add(Retryable.class);
        this.pointcut = this.buildPointcut(retryableAnnotationTypes);
        this.advice = this.buildAdvice();
        if (this.advice instanceof BeanFactoryAware) {
            ((BeanFactoryAware)this.advice).setBeanFactory(this.beanFactory);
        }

    }

 protected Advice buildAdvice() {
        AnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor();
        if (this.retryContextCache != null) {
            interceptor.setRetryContextCache(this.retryContextCache);
        }

        if (this.retryListeners != null) {
            interceptor.setListeners(this.retryListeners);
        }

        if (this.methodArgumentsKeyGenerator != null) {
            interceptor.setKeyGenerator(this.methodArgumentsKeyGenerator);
        }

        if (this.newMethodArgumentsIdentifier != null) {
            interceptor.setNewItemIdentifier(this.newMethodArgumentsIdentifier);
        }

        if (this.sleeper != null) {
            interceptor.setSleeper(this.sleeper);
        }

        return interceptor;
    }
    
 protected Pointcut buildPointcut(Set<Class<? extends Annotation>> retryAnnotationTypes) {
        ComposablePointcut result = null;
        Iterator var3 = retryAnnotationTypes.iterator();

        while(var3.hasNext()) {
            Class<? extends Annotation> retryAnnotationType = (Class)var3.next();
            Pointcut filter = new AnnotationClassOrMethodPointcut(retryAnnotationType);
            if (result == null) {
                result = new ComposablePointcut(filter);
            } else {
                result.union(filter);
            }
        }

        return result;
    }

    
}
  • AnnotationAwareRetryOperationsInterceptor实现了
  • getDelegate方法简化如下, 简单来说去查找标识了@Retryable注解的方法, 生成一个有状态或无状态的拦截器(有状态是针对事务的场景, 每次重试需要抛出异常让事务回滚)
java
public class AnnotationAwareRetryOperationsInterceptor implements IntroductionInterceptor, BeanFactoryAware {

 	public Object invoke(MethodInvocation invocation) throws Throwable {
        MethodInterceptor delegate = this.getDelegate(invocation.getThis(), invocation.getMethod());
        return delegate != null ? delegate.invoke(invocation) : invocation.proceed();
    }

    
    private MethodInterceptor getDelegate(Object target, Method method) {
        org.springframework.retry.annotation.Retryable retryable = (org.springframework.retry.annotation.Retryable)AnnotationUtils.findAnnotation(method, org.springframework.retry.annotation.Retryable.class);
        MethodInterceptor delegate;
        if (StringUtils.hasText(retryable.interceptor())) {
            delegate = (MethodInterceptor)this.beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class);
        } else if (retryable.stateful()) {
            delegate = this.getStatefulInterceptor(target, method, retryable);
        } else {
            delegate = this.getStatelessInterceptor(target, method, retryable);
        }
        ...
    }

    private MethodInterceptor getStatelessInterceptor(Object target, Method method, org.springframework.retry.annotation.Retryable retryable) {
        RetryTemplate template = this.createTemplate(retryable.listeners());
        template.setRetryPolicy(this.getRetryPolicy(retryable));
        template.setBackOffPolicy(this.getBackoffPolicy(retryable.backoff()));
        return RetryInterceptorBuilder.stateless().retryOperations(template).label(retryable.label()).recoverer(this.getRecoverer(target, method)).build();
    }

    private MethodInterceptor getStatefulInterceptor(Object target, Method method, org.springframework.retry.annotation.Retryable retryable) {
        RetryTemplate template = this.createTemplate(retryable.listeners());
        template.setRetryContextCache(this.retryContextCache);
        CircuitBreaker circuit = (CircuitBreaker)AnnotationUtils.findAnnotation(method, CircuitBreaker.class);
        RetryPolicy policy;
        if (circuit != null) {
            policy = this.getRetryPolicy(circuit);
            CircuitBreakerRetryPolicy breaker = new CircuitBreakerRetryPolicy(policy);
            breaker.setOpenTimeout(this.getOpenTimeout(circuit));
            breaker.setResetTimeout(this.getResetTimeout(circuit));
            template.setRetryPolicy(breaker);
            template.setBackOffPolicy(new NoBackOffPolicy());
            String label = circuit.label();
            if (!StringUtils.hasText(label)) {
                label = method.toGenericString();
            }

            return RetryInterceptorBuilder.circuitBreaker().keyGenerator(new FixedKeyGenerator("circuit")).retryOperations(template).recoverer(this.getRecoverer(target, method)).label(label).build();
        } else {
            policy = this.getRetryPolicy(retryable);
            template.setRetryPolicy(policy);
            template.setBackOffPolicy(this.getBackoffPolicy(retryable.backoff()));
            String label = retryable.label();
            return RetryInterceptorBuilder.stateful().keyGenerator(this.methodArgumentsKeyGenerator).newMethodArgumentsIdentifier(this.newMethodArgumentsIdentifier).retryOperations(template).label(label).recoverer(this.getRecoverer(target, method)).build();
        }
    }
}
  • 这里拿无状态的InterceptorBuilder举例, 最后构建的是RetryOperationsInterceptor
java
 public static class StatelessRetryInterceptorBuilder extends RetryInterceptorBuilder<RetryOperationsInterceptor> {
     private final RetryOperationsInterceptor interceptor;
 }
  • RetryOperationsInterceptor是个方法拦截器, 在invoke方法里可以看到其核心流程, 构造一个retryCallback的匿名对象, try-catch了实际要执行的方法, 并交由retryOperations去执行
java
public class RetryOperationsInterceptor implements MethodInterceptor {

    public Object invoke(final MethodInvocation invocation) throws Throwable {
        RetryCallback<Object, Throwable> retryCallback = new RetryCallback<Object, Throwable>() {
            public Object doWithRetry(RetryContext context) throws Exception {
                context.setAttribute("context.name", name);
                if (invocation instanceof ProxyMethodInvocation) {
                    try {
                        return ((ProxyMethodInvocation)invocation).invocableClone().proceed();
                    } catch (Exception var3) {
                        throw var3;
                    } catch (Error var4) {
                        throw var4;
                    } catch (Throwable var5) {
                        throw new IllegalStateException(var5);
                    }
                } else {
                    throw new IllegalStateException("MethodInvocation of the wrong type detected - this should not happen with Spring AOP, so please raise an issue if you see this exception");
                }
            }
        };
        if (this.recoverer != null) {
            ItemRecovererCallback recoveryCallback = new ItemRecovererCallback(invocation.getArguments(), this.recoverer);
            return this.retryOperations.execute(retryCallback, recoveryCallback);
        } else {
            return this.retryOperations.execute(retryCallback);
        }
    }
}

RetryTemplate的核心执行方法

  • 接上文, retryOperations的execute方法实现简化如下, 总体思路如文章开头所讲, 根据重试策略循环调用retryCallback方法, try-catch异常, 在catch里面执行退避策略, 达到重试次数依然没成功时, 执行recovery方法
java
public class RetryTemplate implements RetryOperations {
     public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback) throws E {
        return this.doExecute(retryCallback, (RecoveryCallback)null, (RetryState)null);
    }

    // 核心方法, 此处做了诸多简化
    protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
        // 重试策略
		RetryPolicy retryPolicy = this.retryPolicy;
        // 退避策略
        BackOffPolicy backOffPolicy = this.backOffPolicy;
        // 开启上下文, 记录retry状态, 重试次数等
        RetryContext context = this.open(retryPolicy, state);
    	// 调用监听器open, 下文还有诸多调用监听器的节点方法略, 仅以此处代表
    	boolean running = this.doOpenInterceptors(retryCallback, context);
        
        while(true) {
            Object var34;
            // 根据上下文以及重试策略判断是否可以重试
			if (this.canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                try {
                    var34 = retryCallback.doWithRetry(context);
                    return var34;
                } catch (Throwable var31) {

                    // 如果可以重试, 则执行退避策略
					if (this.canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                        try {
                            backOffPolicy.backOff(backOffContext);
                        } catch (BackOffInterruptedException var30) {
                            throw var30;
                        }
                    }

                    // 这个state判断机制还有待进一步研究...
                    if (state == null || !context.hasAttribute("state.global")) {
                        continue;
                    }
                }
            }

            exhausted = true;
            // 执行recovery方法
            var34 = this.handleRetryExhausted(recoveryCallback, context, state);
            return var34;
        }

        // 调用监听器关闭方法以及清理其他资源(方法略)
        this.doCloseInterceptors(retryCallback, context, lastException);
    }
}

参考

spring-retry解析: https://blog.csdn.net/qq_25582465/article/details/108975317

spring-aop解析: https://blog.csdn.net/f641385712/article/details/89303088