您的当前位置:首页正文

基于AOP的异步操作日志实现

2024-11-08 来源:个人技术集锦

背景

我们常用aop实现传统的同步日志记录。如果业务中有涉及到就没法做到了。或者就是使用强耦合的方式来实现。但是这样就和AOP实现日志记录成了两套体系。

实现

业务中常用*@Async注解,控制异步.其原理是启动时扫描@Async注解对象,为其生成代理类,在代理中通过Executor*线程池来执行异步任务。

那么要检测业务方法是否处理成功,就是就是要检测线程中run方法是否执行成功。

核心

这里只展示核心代码,其他非核心注解定义、切面实现不做过多展示。需要强调的是,操作日志注解相比传统注解。增加了是否是异步请求标识。

方案一

重写AsyncExecutionInterceptor.invoke方法。在异步任务执行成功和失败时发布事件。AOP统一类中进行事件监听。

发布任务事件

@Override
    @Nullable
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }
       //获取请求开始设置的标识符,后面实现,需要用标识符判断当前异步任务是否需要记录操作日志
        Map<String, Object> map = Maps.newHashMap();
        ServletRequestAttributes servletRequest = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        if (servletRequest != null) {
            HttpServletRequest request = servletRequest.getRequest();
            Enumeration<String> attributeNames = request.getAttributeNames();
            //https:///dawnStart/article/details/133637582
            while (attributeNames.hasMoreElements()) {
                String element = attributeNames.nextElement();
                Object attribute = request.getAttribute(element);
                map.put(element, attribute);
            }
        }
       //创建异步任务
        Callable<Object> task = () -> {
            try {
                Object result = invocation.proceed();
                if (result instanceof Future) {
                    return ((Future<?>) result).get();
                }
                //异步任务执行成功
                SpringUtil.getApplicationContext().publishEvent(new AsyncFinishEvent("异步执行成功", map));
            } catch (ExecutionException ex) {
                handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
               //异步任务执行失败
                SpringUtil.getApplicationContext().publishEvent(new AsyncUncaughtEvent("异步执行失败", ex, map));
            } catch (Throwable ex) {
                handleError(ex, userDeclaredMethod, invocation.getArguments());
               //异步任务执行失败
                SpringUtil.getApplicationContext().publishEvent(new AsyncUncaughtEvent("异步执行失败", ex, map));
            }
            return null;
        };
       //异步任务提交至线程池执行
        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

异步任务结果监听

   /**
     * 异步任务执行失败 事件
     *
     * @param asyncUncaughtEvent asyncUncaughtEvent
     */ 
  @EventListener
    public void asyncFailProcessEvent(AsyncUncaughtEvent asyncUncaughtEvent) {
        if (Objects.isNull(asyncUncaughtEvent)) {
            return;
        }
       //OPERATE_TYPE标识符判断当前异步是否需要记录操作日志
        Map<String, Object> attributes = asyncUncaughtEvent.getAttributes();
        if (MapUtils.isNotEmpty(attributes) && attributes.containsKey(OPERATE_TYPE)) {
            syncSave(FAIL);
        }
    }

    /**
     * 异步成功事件
     *
     * @param asyncFinishEvent asyncFinishEvent
     */
    @EventListener
    public void asyncSuccessProcessEvent(AsyncFinishEvent asyncFinishEvent) {
        if (Objects.isNull(asyncFinishEvent)) {
            return;
        }
        //OPERATE_TYPE标识符判断当前异步是否需要记录操作日志
        Map<String, Object> attributes = asyncFinishEvent.getAttributes();
        if (MapUtils.isNotEmpty(attributes) && attributes.containsKey(OPERATE_TYPE)) {
            syncSave(SUCCESS);
        }


    }

方案二

指定@Async执行的线程池,然后重写执行线程池,在重写的线程池中,进行日志记录相关操作。可以参考以下代码,以后代码主要的功能为,在异步执行时,进行上下文信息传递,代码不全仅供参考。


    @Bean(name = "applicationTaskExecutor")
    @Primary
    public ThreadPoolTaskExecutor taskExecutor() {
       //这里使用重写的线程池
        ThreadPoolTaskExecutor executor = new IdThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix("Id-Base-ExecutorThread-");
        executor.initialize();
        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return taskExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        RequestHolder.remove();
        return new MyAsyncUncaughtExceptionHandler();
    }

    //定义执行线程
    private static class IdCallable<T> implements Callable<T> {
        private final Callable<T> task;
        private final RequestAttributes context;
        private final RequestHeaderDto requestHeader;

        public IdCallable(Callable<T> task, RequestAttributes context, RequestHeaderDto requestHeader) {
            this.task = task;
            this.context = context;
            this.requestHeader = requestHeader;
        }

        @Override
        @SuppressWarnings("all")
        public T call() throws Exception {
            try {
                if (requestHeader != null) {
                    RequestHolder.setHeaderDto(requestHeader);
                }
                if (context != null) {
                    RequestContextHolder.setRequestAttributes(context, true);
                }
                return task.call();

            } finally {
                //RequestContextHolder.resetRequestAttributes();
                RequestHolder.remove();
            }
        }
    }

    private static class IdThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

       //重新任务提交
        @Override
        @SuppressWarnings("all")
        public <T> Future<T> submit(Callable<T> task) {
            RequestAttributes requestAttributes = null;
            try {
                //非web请求获取不到
                requestAttributes = RequestContextHolder.currentRequestAttributes();
            } catch (Exception ignored) {
            }
            return super.submit(new IdCallable<>(task, requestAttributes, RequestHolder.getRequestHeader()));
        }
       //重新任务提交
        @Override
        @SuppressWarnings("all")
        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
            RequestAttributes requestAttributes = null;
            try {
                //非web请求获取不到
                requestAttributes = RequestContextHolder.currentRequestAttributes();
            } catch (Exception ignored) {
            }
            return super.submitListenable(new IdCallable<>(task, requestAttributes, RequestHolder.getRequestHeader()));
        }
    }

Top