瀏覽代碼

feat:Scheduled配置线程池支持多个任务异步运行

wangzaijun 1 月之前
父節點
當前提交
7c97393226

+ 11 - 17
mo-daq/src/main/java/com/smppw/modaq/application/service/EmailParseApiServiceImpl.java

@@ -19,8 +19,6 @@ import com.smppw.modaq.domain.mapper.MailboxInfoMapper;
 import com.smppw.modaq.domain.service.EmailParseService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
 
 import java.io.BufferedReader;
@@ -44,19 +42,17 @@ public class EmailParseApiServiceImpl implements EmailParseApiService {
     private final EmailParseService emailParseService;
     private final EmailParseInfoMapper emailParseInfoMapper;
     private final EmailFileInfoMapper emailFileInfoMapper;
-    private final ThreadPoolTaskExecutor asyncExecutor;
+//    private final ThreadPoolTaskExecutor asyncExecutor;
 //    private final EmailTaskInfoMapper emailTaskInfoMapper;
 
     public EmailParseApiServiceImpl(MailboxInfoMapper mailboxInfoMapper,
                                     EmailParseService emailParseService,
                                     EmailParseInfoMapper emailParseInfoMapper,
-                                    EmailFileInfoMapper emailFileInfoMapper,
-                                    @Qualifier("asyncExecutor") ThreadPoolTaskExecutor asyncExecutor) {
+                                    EmailFileInfoMapper emailFileInfoMapper) {
         this.mailboxInfoMapper = mailboxInfoMapper;
         this.emailParseService = emailParseService;
         this.emailParseInfoMapper = emailParseInfoMapper;
         this.emailFileInfoMapper = emailFileInfoMapper;
-        this.asyncExecutor = asyncExecutor;
     }
 
     @Override
@@ -162,19 +158,17 @@ public class EmailParseApiServiceImpl implements EmailParseApiService {
 
 //        List<EmailFundNavDTO> emailFundNavDTOList = CollUtil.newArrayList();
         Map<EmailContentInfoDTO, List<EmailZipFileDTO>> emailZipFileMap = MapUtil.newHashMap();
-        asyncExecutor.execute(() -> {
-            for (EmailContentInfoDTO emailContentInfoDTO : emailContentInfoDTOList) {
-                try {
-                    List<EmailZipFileDTO> emailZipFiles = emailParseService.parseZipEmail(emailContentInfoDTO);
-                    emailZipFileMap.put(emailContentInfoDTO, emailZipFiles);
+        for (EmailContentInfoDTO emailContentInfoDTO : emailContentInfoDTOList) {
+            try {
+                List<EmailZipFileDTO> emailZipFiles = emailParseService.parseZipEmail(emailContentInfoDTO);
+                emailZipFileMap.put(emailContentInfoDTO, emailZipFiles);
 //                    emailFundNavDTOList.addAll(fundNavDTOList);
-                } catch (Exception e) {
-                    log.error("重新解析邮件失败,邮件id:{},堆栈信息:{}", emailId, ExceptionUtil.stacktraceToString(e));
-                }
+            } catch (Exception e) {
+                log.error("重新解析邮件失败,邮件id:{},堆栈信息:{}", emailId, ExceptionUtil.stacktraceToString(e));
             }
-            // 保存相关信息 -> 邮件信息表,邮件文件表,邮件净值表,邮件规模表,基金净值表
-            emailParseService.saveRelatedTable(null, emailParseInfoDO.getEmail(), emailZipFileMap);
-        });
+        }
+        // 保存相关信息 -> 邮件信息表,邮件文件表,邮件净值表,邮件规模表,基金净值表
+        emailParseService.saveRelatedTable(null, emailParseInfoDO.getEmail(), emailZipFileMap);
     }
 //
 //    @Override

+ 25 - 0
mo-daq/src/main/java/com/smppw/modaq/infrastructure/config/AsyncSchedulerConfig.java

@@ -0,0 +1,25 @@
+package com.smppw.modaq.infrastructure.config;
+
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+
+import java.util.concurrent.Executor;
+
+@Configuration
+@EnableScheduling
+public class AsyncSchedulerConfig implements SchedulingConfigurer {
+
+    private final Executor asyncExecutor;
+
+    public AsyncSchedulerConfig(@Qualifier("asyncExecutor") Executor asyncExecutor) {
+        this.asyncExecutor = asyncExecutor;
+    }
+
+    @Override
+    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
+        taskRegistrar.setScheduler(this.asyncExecutor);
+    }
+}

+ 0 - 28
mo-daq/src/main/java/com/smppw/modaq/infrastructure/config/DataSourceConfiguration.java

@@ -1,28 +0,0 @@
-//package com.smppw.modaq.infrastructure.config;
-//
-//import com.zaxxer.hikari.HikariDataSource;
-//import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
-//import org.springframework.boot.context.properties.ConfigurationProperties;
-//import org.springframework.context.annotation.Bean;
-//import org.springframework.context.annotation.Configuration;
-//import org.springframework.context.annotation.Primary;
-//
-//@Configuration
-//public class DataSourceConfiguration {
-//    public static final String DATA_DAQ_PROPERTIES = "spring.datasource.data-daq";
-//    public static final String DS_DATA_DAQ = "spring.datasource.data-daq.hikari";
-//
-//    @Primary
-//    @Bean(name = DATA_DAQ_PROPERTIES)
-//    @ConfigurationProperties(prefix = DATA_DAQ_PROPERTIES)
-//    public DataSourceProperties daqDataSourceProperties() {
-//        return new DataSourceProperties();
-//    }
-//
-//    @Primary
-//    @Bean(name = DS_DATA_DAQ)
-//    @ConfigurationProperties(prefix = DS_DATA_DAQ)
-//    public HikariDataSource dataTrustDataSource() {
-//        return daqDataSourceProperties().initializeDataSourceBuilder().type(HikariDataSource.class).build();
-//    }
-//}

+ 1 - 1
mo-daq/src/main/java/com/smppw/modaq/infrastructure/config/ThreadPoolConfig.java

@@ -35,7 +35,7 @@ public class ThreadPoolConfig {
     @Bean("asyncExecutor")
     public ThreadPoolTaskExecutor asyncExecutor() {
         ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
-        taskExecutor.setCorePoolSize(2);
+        taskExecutor.setCorePoolSize(5);
         taskExecutor.setMaxPoolSize(5);
         taskExecutor.setQueueCapacity(50);
         taskExecutor.setKeepAliveSeconds(60);