package cn.iocoder.yudao.module.digitalcourse.manager; import cn.iocoder.yudao.module.digitalcourse.model.MediaTask; import cn.iocoder.yudao.module.digitalcourse.service.coursemedia.CourseMediaServiceUtil; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.annotation.Resource; import org.springframework.stereotype.Component; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; /** * @author kanglujie * @date 2025-04-07 14:36:00 */ @Component public class MediaTaskManager { private final BlockingQueue queue = new LinkedBlockingQueue<>(); private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final ObjectMapper mapper = new ObjectMapper() .registerModule(new JavaTimeModule()) .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); private final Path persistPath = Paths.get("media_task_queue.json"); private final Path persistTempPath = Paths.get("media_task_queue_temp.json"); private final Object lock = new Object(); // 锁对象 private volatile MediaTask currentTask = null; @Resource private CourseMediaServiceUtil courseMediaServiceUtil; @PostConstruct public void init() throws IOException { mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); loadQueueFromDisk(); startWorker(); } public int submitTask(MediaTask task) throws IOException { synchronized (lock) { queue.add(task); persistQueue(); // 立即持久化 return queue.size(); } } public int getQueuePosition(Long taskId) { int pos = 1; if (currentTask != null && currentTask.getId().equals(taskId)) return 0; for (MediaTask task : queue) { if (task.getId().equals(taskId)) return pos; pos++; } return -1; } private void startWorker() { executor.submit(() -> { while (true) { try { currentTask = queue.take(); // 开始处理 persistQueue(); System.out.println("开始合成:" + currentTask.getId()); courseMediaServiceUtil.remoteMegerMedia(currentTask.getReq()); synchronized (lock) { currentTask = null; persistQueue(); // 执行完再持久化 } } catch (Exception e) { e.printStackTrace(); // 如果异常了,重新入队 if (currentTask != null) { queue.add(currentTask); currentTask = null; } persistQueue(); } } }); } private void persistQueue() { synchronized (lock) { try { QueueState state = new QueueState(); state.current = currentTask; state.queue = new ArrayList<>(queue); mapper.writeValue(persistTempPath.toFile(), state); Files.move(persistTempPath, persistPath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE); } catch (IOException e) { e.printStackTrace(); } } } private void loadQueueFromDisk() { synchronized (lock) { if (Files.exists(persistPath)) { try { QueueState state = mapper.readValue(persistPath.toFile(), QueueState.class); if (state.current != null) { queue.add(state.current); } if (state.queue != null) { queue.addAll(state.queue); } } catch (IOException e) { e.printStackTrace(); } } } } @PreDestroy public void shutdown() { executor.shutdownNow(); persistQueue(); } // 内部类用于 JSON 映射 public static class QueueState { public MediaTask current; public List queue; } }