先说官方文档要求:在微信官方文档中有要求5秒内做出响应的要求,如下图所示:
官方官方文档中的对回复用户消息的处理,官方给出了两种方式:
- 1、在微信回调我们接口时,直接 response 响应消息内容,这样可以直接完成响应用户消息。
- 2、在微信回调我们接口时,直接 response 响应 success 字符串,然后在官方限定的一定时间内,我们可以主动调用微信的接口给用户发送消息。
结合这两点内容我们能很显然的得出结论:如果我们可以通过1来完成那绝对不做2,毕竟2多了一次主动调用微信接口的操作,在一定程度上加大了我们系统的资源消耗。
但是在实际业务中,即便是我们的业务处理,正常情况不会超过5秒这么久,但是系统总有可能有例外情况,我们总要考虑可能出现哪怕1%可能超出5秒的情况。
本着对编码精益求精的态度,我的做法是做精准时间处理,我以4秒为限,如果4秒以内可以处理完自己的业务逻辑,则直接返回对应需要响应给用户的消息,反之直接返回success。
我的业务场景:在接受到微信回调过来的用户消息后,我需要通过HTTP调用一个AI平台自动计算需要响应给用户的消息。也就是说,只要我调用的目标系统正常,我是肯定可以在4秒之内获得结果直接通过 response 返回给微信。除非 AI 系统出现异常这种一般不会出现的特殊情况出现。所以结论就是:99%的情况我不需要再触发一次主动调用微信的接口来二次响应消息。)
如何精准的控制这个4秒时间,我是通过控制主线程和子线程的方式来处理的。整体来说就是主线程先等待4秒,如果4秒钟内子线程提前计算出响应的消息,则立刻让主线程恢复继续执行把消息直接响应给用户。如果主线程等待4秒后,因为调用AI超时等原因子线程没有计算出需要响应的消息,则主线程不再继续等待,直接响应 sucess 字符串。
以下我写了两份代码,都可以完成这个方案,一个是两个线程直接通过原生的方式控制,一个是利用了线程池的 Future 超时调用完成的,两个代码类具体代码如下:
package com.example.springbootdemo1;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import java.util.concurrent.TimeUnit;
/**
* ThreadDemo
*
* @author 单红宇
* @date 2023/10/28 17:44
*/
@Slf4j
public class ThreadDemo {
/**
* main入口
*
* @param args args
*/
public static void main(String[] args) {
Thread mainThread = Thread.currentThread();
ChildThread childThread = new ChildThread(mainThread);
try {
// 传入HttpServletResponse变量,模拟response输出
Logger response = log;
childThread.execute(2000, response);
// 主线程休眠5秒
childThread.setMainThreadWaiting(true);
TimeUnit.MILLISECONDS.sleep(4000);
childThread.setMainThreadWaiting(false);
response.info("response输出success");
} catch (InterruptedException e) {
childThread.setMainThreadWaiting(false);
mainThread.interrupt();
}
}
}
/**
* ChildThread
*
* @author 单红宇
*/
@Slf4j
class ChildThread {
/**
* 主线程是否等待状态,true=等待中
*/
private volatile boolean mainThreadWaiting = false;
/**
* 主线程
*/
private final Thread mainThread;
public ChildThread(Thread mainThread) {
this.mainThread = mainThread;
}
/**
* 这里方法里new Thread是模拟一个@Async注解修饰的异步方法
*
* @param time time
* @param response response
*/
public void execute(final long time, Logger response) {
new Thread(() -> {
try {
// 子线程睡眠模拟网络调用
TimeUnit.MILLISECONDS.sleep(time);
if (this.isMainThreadWaiting() && mainThread.getState() == Thread.State.TIMED_WAITING) {
mainThread.interrupt();
// 输出结果
response.info("response直接输出结果给用户");
} else {
log.info("单独调用接口发送消息给用户");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("子线程异常", e);
}
}).start();
}
public boolean isMainThreadWaiting() {
return mainThreadWaiting;
}
public void setMainThreadWaiting(boolean mainThreadWaiting) {
this.mainThreadWaiting = mainThreadWaiting;
}
}
package com.example.springbootdemo1;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* FutureThreadDemo
*
* @author 单红宇
* @date 2023/10/28 17:44
*/
@Slf4j
public class FutureThreadDemo {
/**
* 注入springboot默认的线程池,这里new是模拟注入,在springboot环境中可以直接注入该对象
*/
// @Autowired
private static ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
static {
// 本例为Demo,随便设置
threadPoolTaskExecutor.setCorePoolSize(5); // 设置核心线程数
threadPoolTaskExecutor.setMaxPoolSize(10); // 设置最大线程数
threadPoolTaskExecutor.setQueueCapacity(100); // 设置队列容量
threadPoolTaskExecutor.initialize();
}
/**
* main
*
* @param args args
*/
public static void main(String[] args) throws InterruptedException {
MyCallable myCallable = new MyCallable(Thread.currentThread());
Future<String> future = threadPoolTaskExecutor.submit(myCallable);
String responseStr = "success";
try {
myCallable.setMainThreadWaiting(true);
String invokeResult = future.get(4000, TimeUnit.MILLISECONDS);
myCallable.setMainThreadWaiting(false);
if (invokeResult != null) {
responseStr = invokeResult;
}
} catch (InterruptedException e) {
myCallable.setMainThreadWaiting(false);
Thread.currentThread().interrupt();
log.error("InterruptedException", e);
} catch (ExecutionException | TimeoutException e) {
myCallable.setMainThreadWaiting(false);
log.error("ExecutionException | TimeoutException", e);
} finally {
log.info("response >>> {}", responseStr);
}
// 如果是spring环境中注入的对象,请不要下面的代码,因为threadPoolTaskExecutor由spring托管
TimeUnit.SECONDS.sleep(6);
threadPoolTaskExecutor.shutdown();
}
}
/**
* MyCallable
*
* @author 单红宇
*/
@Slf4j
class MyCallable implements Callable<String> {
/**
* mainThread
*/
private final Thread mainThread;
/**
* 主线程是否等待状态,true=等待中
*/
private volatile boolean mainThreadWaiting = false;
public MyCallable(Thread mainThread) {
this.mainThread = mainThread;
}
@Override
public String call() throws Exception {
// 模拟业务调用的执行时间
TimeUnit.MILLISECONDS.sleep(1000);
if (this.isMainThreadWaiting() && mainThread.getState() == Thread.State.TIMED_WAITING) {
// 输出结果
return "<xml>invoke result</xml>";
} else {
log.info("单独调用接口发送消息给用户");
return null;
}
}
public boolean isMainThreadWaiting() {
return mainThreadWaiting;
}
public void setMainThreadWaiting(boolean mainThreadWaiting) {
this.mainThreadWaiting = mainThreadWaiting;
}
}
以上两份代码,我都做了测试,应该没有问题。如果你发现问题或者有疑问,可以及时在文章底部留言给我。
(END)