我们常用aop实现传统的同步日志记录。如果业务中有涉及到就没法做到了。或者就是使用强耦合的方式来实现。但是这样就和AOP实现日志记录成了两套体系。
业务中常用*@Async注解,控制异步.其原理是启动时扫描@Async注解对象,为其生成代理类,在代理中通过Executor*线程池来执行异步任务。
那么要检测业务方法是否处理成功,就是就是要检测线程中run方法是否执行成功。
这里只展示核心代码,其他非核心注解定义、切面实现不做过多展示。需要强调的是,操作日志注解相比传统注解。增加了是否是异步请求标识。
重写AsyncExecutionInterceptor.invoke方法。在异步任务执行成功和失败时发布事件。AOP统一类中进行事件监听。
发布任务事件
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
//获取请求开始设置的标识符,后面实现,需要用标识符判断当前异步任务是否需要记录操作日志
Map<String, Object> map = Maps.newHashMap();
ServletRequestAttributes servletRequest = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (servletRequest != null) {
HttpServletRequest request = servletRequest.getRequest();
Enumeration<String> attributeNames = request.getAttributeNames();
//https:///dawnStart/article/details/133637582
while (attributeNames.hasMoreElements()) {
String element = attributeNames.nextElement();
Object attribute = request.getAttribute(element);
map.put(element, attribute);
}
}
//创建异步任务
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
//异步任务执行成功
SpringUtil.getApplicationContext().publishEvent(new AsyncFinishEvent("异步执行成功", map));
} catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
//异步任务执行失败
SpringUtil.getApplicationContext().publishEvent(new AsyncUncaughtEvent("异步执行失败", ex, map));
} catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
//异步任务执行失败
SpringUtil.getApplicationContext().publishEvent(new AsyncUncaughtEvent("异步执行失败", ex, map));
}
return null;
};
//异步任务提交至线程池执行
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
异步任务结果监听
/**
* 异步任务执行失败 事件
*
* @param asyncUncaughtEvent asyncUncaughtEvent
*/
@EventListener
public void asyncFailProcessEvent(AsyncUncaughtEvent asyncUncaughtEvent) {
if (Objects.isNull(asyncUncaughtEvent)) {
return;
}
//OPERATE_TYPE标识符判断当前异步是否需要记录操作日志
Map<String, Object> attributes = asyncUncaughtEvent.getAttributes();
if (MapUtils.isNotEmpty(attributes) && attributes.containsKey(OPERATE_TYPE)) {
syncSave(FAIL);
}
}
/**
* 异步成功事件
*
* @param asyncFinishEvent asyncFinishEvent
*/
@EventListener
public void asyncSuccessProcessEvent(AsyncFinishEvent asyncFinishEvent) {
if (Objects.isNull(asyncFinishEvent)) {
return;
}
//OPERATE_TYPE标识符判断当前异步是否需要记录操作日志
Map<String, Object> attributes = asyncFinishEvent.getAttributes();
if (MapUtils.isNotEmpty(attributes) && attributes.containsKey(OPERATE_TYPE)) {
syncSave(SUCCESS);
}
}
指定@Async执行的线程池,然后重写执行线程池,在重写的线程池中,进行日志记录相关操作。可以参考以下代码,以后代码主要的功能为,在异步执行时,进行上下文信息传递,代码不全仅供参考。
@Bean(name = "applicationTaskExecutor")
@Primary
public ThreadPoolTaskExecutor taskExecutor() {
//这里使用重写的线程池
ThreadPoolTaskExecutor executor = new IdThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("Id-Base-ExecutorThread-");
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return taskExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
RequestHolder.remove();
return new MyAsyncUncaughtExceptionHandler();
}
//定义执行线程
private static class IdCallable<T> implements Callable<T> {
private final Callable<T> task;
private final RequestAttributes context;
private final RequestHeaderDto requestHeader;
public IdCallable(Callable<T> task, RequestAttributes context, RequestHeaderDto requestHeader) {
this.task = task;
this.context = context;
this.requestHeader = requestHeader;
}
@Override
@SuppressWarnings("all")
public T call() throws Exception {
try {
if (requestHeader != null) {
RequestHolder.setHeaderDto(requestHeader);
}
if (context != null) {
RequestContextHolder.setRequestAttributes(context, true);
}
return task.call();
} finally {
//RequestContextHolder.resetRequestAttributes();
RequestHolder.remove();
}
}
}
private static class IdThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
//重新任务提交
@Override
@SuppressWarnings("all")
public <T> Future<T> submit(Callable<T> task) {
RequestAttributes requestAttributes = null;
try {
//非web请求获取不到
requestAttributes = RequestContextHolder.currentRequestAttributes();
} catch (Exception ignored) {
}
return super.submit(new IdCallable<>(task, requestAttributes, RequestHolder.getRequestHeader()));
}
//重新任务提交
@Override
@SuppressWarnings("all")
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
RequestAttributes requestAttributes = null;
try {
//非web请求获取不到
requestAttributes = RequestContextHolder.currentRequestAttributes();
} catch (Exception ignored) {
}
return super.submitListenable(new IdCallable<>(task, requestAttributes, RequestHolder.getRequestHeader()));
}
}