package com.xmzs.midjourney.service; import cn.hutool.cache.CacheUtil; import cn.hutool.cache.impl.TimedCache; import cn.hutool.core.exceptions.CheckedUtil; import cn.hutool.core.text.CharSequenceUtil; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.xmzs.midjourney.Constants; import com.xmzs.midjourney.ProxyProperties; import com.xmzs.midjourney.enums.TaskStatus; import com.xmzs.midjourney.support.Task; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.time.Duration; @Slf4j @Service public class NotifyServiceImpl implements NotifyService { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final ThreadPoolTaskExecutor executor; private final TimedCache taskLocks = CacheUtil.newTimedCache(Duration.ofHours(1).toMillis()); public NotifyServiceImpl(ProxyProperties properties) { this.executor = new ThreadPoolTaskExecutor(); this.executor.setCorePoolSize(properties.getNotifyPoolSize()); this.executor.setThreadNamePrefix("TaskNotify-"); this.executor.initialize(); } @Override public void notifyTaskChange(Task task) { String notifyHook = task.getPropertyGeneric(Constants.TASK_PROPERTY_NOTIFY_HOOK); if (CharSequenceUtil.isBlank(notifyHook)) { return; } String taskId = task.getId(); TaskStatus taskStatus = task.getStatus(); Object taskLock = this.taskLocks.get(taskId, (CheckedUtil.Func0Rt) Object::new); try { String paramsStr = OBJECT_MAPPER.writeValueAsString(task); this.executor.execute(() -> { synchronized (taskLock) { try { ResponseEntity responseEntity = postJson(notifyHook, paramsStr); if (responseEntity.getStatusCode() == HttpStatus.OK) { log.debug("推送任务变更成功, 任务ID: {}, status: {}, notifyHook: {}", taskId, taskStatus, notifyHook); } else { log.warn("推送任务变更失败, 任务ID: {}, notifyHook: {}, code: {}, msg: {}", taskId, notifyHook, responseEntity.getStatusCodeValue(), responseEntity.getBody()); } } catch (Exception e) { log.warn("推送任务变更失败, 任务ID: {}, notifyHook: {}, 描述: {}", taskId, notifyHook, e.getMessage()); } } }); } catch (JsonProcessingException e) { log.warn("推送任务变更失败, 任务ID: {}, notifyHook: {}, 描述: {}", taskId, notifyHook, e.getMessage()); } } private ResponseEntity postJson(String notifyHook, String paramsJson) { HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); HttpEntity httpEntity = new HttpEntity<>(paramsJson, headers); return new RestTemplate().postForEntity(notifyHook, httpEntity, String.class); } }