目录
- 隔离概念
- 常见的隔离策略
- 基于Servlet 3实现请求隔离
- 基于DeferredResult实现Http长轮询监听配置和微信扫码登录
- 隔离术相关拓展
推荐:阅读下自定义配置解析类:WebAsyncBeanDefinitionRegistryPostProcessor,希望能有所收获。
隔离概念
将系统或资源相互分开,发生故障时,限定传播范围和影响范围,以防拖垮整个系统/服务。
常见的隔离策略
线程隔离:主要指线程池的隔离,主要分为异步请求和异步处理。异步请求:根据业务进行请求分类,交给不同的线程池处理。当一个业务请求出现问题时,不会影响到其它线程池,从而保证了其他服务可用性。异步处理:对一些不重要的业务流程(消息通知,日志记录),可以用线程池异步处理从主业务流程中剥离出来;对于多策略业务流程处理(微信支付,支付宝支付),可以用线程池隔离各个策略处理,提高可用性。
服务隔离:实际工作中,会有多个需求同时进行,在联调和测试时,会部署多个版本的服务,通过分组和版本号来隔离不同需求的服务。
机房隔离:系统为了应对高可用的要求,会进行多机房部署,当一个机房出问题后,通过把流量切到另一个机房,来保证系统的可用性。
读写隔离:通过主从模式将读和写集群隔离,例如:数据库的读写分离,redis主从同步。
热点隔离:对于读热点,通过多级缓存来处理;对于写热点,一般通过缓存+队列模式削峰处理。
基于Servlet 3实现请求隔离
我们将从如下几点了解Servlet3异步化:为什么实现异步化需要Servlet3?请求异步化的好处?哪些容器支持Servlet3?如何使用Servlet3处理请求异步化?
为什么实现异步化需要Servlet3?Servlet3开始支持请求异步化,可以将请求解析和业务处理进行分离。
Tomcat处理Http请求流程分为:
1.接收请求并解析成HttpServletRequest,
2.交给Servlet进行业务处理,
3.通过HttpServletResponse写出响应。
Servlet2请求处理流程:servlet线程负责请求解析,业务处理和响应数据生成整个过程,一直处于阻塞状态。
Servlet2请求处理流程
Servlet3异步请求处理流程:tomcat线程只负责请求解析,把业务处理和响应数据生成交给业务线程处理,servlet线程归还到容器,来处理其他请求,提高了web容器的并发能力。
Servlet3异步请求处理流程
异步化的好处:
- 请求解析和业务处理分离,提高Tomcat容器并发处理能力。
- 根据业务重要性进行分级,定义不同业务线程池来处理,业务之间相互隔离,如图所示:
业务线程池隔离
支持Servlet3的容器:
Tomcat 7,JBOSS 6.0.0 ,Weblogic 12c,WebSphere 8.0 ,Jetty 8.0,Glassfish 3.0,Resin 4.0.4及以上
具体代码:
controller层:
@GetMapping(value = "/async/servlet")
public void asyncServlet(HttpServletRequest request, HttpServletResponse response){
System.out.println("外部线程1:" + Thread.currentThread().getName());
WebAsyncTaskExecutor.submit(request, ()->{
System.out.println("内部线程:" + Thread.currentThread().getName());
return "执行成功";
});
System.out.println("外部线程2:" + Thread.currentThread().getName());
}
异步任务执行器:
@Slf4j
public class WebAsyncTaskExecutor {
private static long DefaultTimeOut = 60 * 1000L;
private DeferredResult Default = new DeferredResult();
public static <R> DeferredResult<R> submit(Supplier<R> supplier) {
return submit(supplier,defaultDeferredResult());
}
public static <R> DeferredResult<R> submit(Supplier<R> supplier, DeferredResult<R> deferredResult) {
ThreadPoolExecutor threadPoolExecutor = WebAsyncPoolManager.get();
if (threadPoolExecutor == null) {
R result = supplier.get();
deferredResult.setResult(result);
return deferredResult;
}
threadPoolExecutor.execute(()->{
R result = supplier.get();
deferredResult.setResult(result);
});
return deferredResult;
}
public static <R> void submit(HttpServletRequest request, Supplier<R> supplier) {
submit(request,supplier,null);
}
public static <R> void submit(HttpServletRequest request, Supplier<R> supplier, AsyncListener asyncListener) {
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(DefaultTimeOut);
if (asyncListener != null) {
asyncContext.addListener(asyncListener);
}
ThreadPoolExecutor threadPoolExecutor = WebAsyncPoolManager.get();
if (threadPoolExecutor == null) {
R result = supplier.get();
wirte(asyncContext,result);
return;
}
threadPoolExecutor.execute(()->{
R result = supplier.get();
wirte(asyncContext,result);
});
}
private static DeferredResult defaultDeferredResult(){
DeferredResult deferredResult = new DeferredResult(DefaultTimeOut);
deferredResult.onTimeout(new Runnable() {
@Override
public void run() {
deferredResult.setResult(BaseResponse.error(ErrorMessage.TIMEOUT_ERROR));
}
});
return deferredResult;
}
private static void wirte(AsyncContext asyncContext, Object result) {
HttpServletRequest req = (HttpServletRequest) asyncContext.getRequest();
HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse();
try {
ResponseUtil.responseJson(req,resp,HttpServletResponse.SC_OK,result);
} catch (Exception ex) {
log.error("wirte error!",ex);
resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
} finally {
asyncContext.complete();
}
}
}
自定义配置解析类:
#yml文件的相关配置
spring:
webasync:
pools[0]:
name: test
corePoolSize: 5
maxPoolSize: 5
keepAliveTime: 30
queueSize: 1024
pools[1]:
name: test1
corePoolSize: 5
maxPoolSize: 5
keepAliveTime: 30
queueSize: 1024
@Slf4j
public class WebAsyncBeanDefinitionRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor, ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
log.info("=======WebAsyncBeanDefinitionRegistryPostProcessor start ======");
Environment environment = this.applicationContext.getEnvironment();
BindResult<WebAsyncConfig> bindResult = Binder.get(environment).bind(WebAsyncConfig.PREFIX, Bindable.of(WebAsyncConfig.class));
if (bindResult == null || !bindResult.isBound()) {
log.info("WebAsyncBeanDefinitionRegistryPostProcessor bindResult is null");
return;
}
WebAsyncConfig webAsyncConfig = bindResult.get();
log.info("WebAsyncBeanDefinitionRegistryPostProcessor webAsyncConfig : {}",JSON.toJSONString(webAsyncConfig));
if (webAsyncConfig == null || webAsyncConfig.getPools() == null || webAsyncConfig.getPools().size() <= 0) {
log.error("WebAsyncBeanDefinitionRegistryPostProcessor webasync config error! current config:{}", JSON.toJSONString(webAsyncConfig));
return;
}
for (int i = 0; i < webAsyncConfig.getPools().size() ; i++) {
ThreadPoolConfig threadPoolConfig = webAsyncConfig.getPools().get(i);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadPoolConfig.getCorePoolSize()
, threadPoolConfig.getMaxPoolSize()
, threadPoolConfig.getKeepAliveTime(), TimeUnit.SECONDS
, new ArrayBlockingQueue<Runnable>(threadPoolConfig.getQueueSize())
, new ThreadFactoryBuilder().setNameFormat("webasync-pool-"+ threadPoolConfig.getName() +"-%d").build());
WebAsyncPoolManager.put(i,threadPoolExecutor);
}
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
}
基于DeferredResult实现Http长轮询拉取配置和微信扫码登录
Http长轮询监听配置的流程:
- client端发送http请求到server端
- server端hold住http请求连接,并设置超时
- 超时之内,有变化立即发送client端,否则,直到超时断开连接,继续重复上面步骤
关键在于:server端怎么hold住http请求?变化后如何找到对应的请求?完成和超时后如何处理?
server端怎么hold住http请求? 用Map存放请求(DeferredResult),key:唯一id,val:DeferredResult
变化后如何找到对应的请求? 通过唯一id来找到对应的请求
完成和超时后如何处理?需要将DeferredResult从Map中移除
具体代码:
定义的Map容器:
/**
* map容器
*/
private static final ConcurrentHashMap<String, CopyOnWriteArrayList<DeferredResult>> deferredResultMultimaps =
new ConcurrentHashMap<>();
请求设置超时时间、完成事件和超时事件,并添加到Map中
//设置超时时间
final DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(DefaultDeferredResultProcessor.CONNECTION_TIMEOUT
,defaultDeferredResultProcessor.getNotModifiedResponse());
//当配置有变动时,直接响应客户端
if(modifyConfigResult != null && HttpStatus.OK.equals(modifyConfigResult.getStatusCode())
&& StringUtils.hasLength(modifyConfigResult.getBody())){
result.setResult(modifyConfigResult);
}
else{
//设置长轮询超时回调方法,记录超时请求参数
result.onTimeout(new Runnable() {
@Override
public void run() {
for (int i=0; i< configKeyList.size();i++){
ConfigInfoVO configKey = configKeyList.get(i);
String key = makeConfigKey(configKey.getDataId(),configKey.getGroup());
defaultDeferredResultProcessor.removeDeferredResult(key,result);
}
}
});
//设置长轮询完成回调方法,从deferredResults集合中剔除相应的异步返回对象
result.onCompletion(new Runnable() {
@Override
public void run() {
for (int i=0; i< configKeyList.size();i++){
ConfigInfoVO configKey = configKeyList.get(i);
String key = makeConfigKey(configKey.getDataId(),configKey.getGroup());
defaultDeferredResultProcessor.removeDeferredResult(key,result);
}
}
});
for (int i=0; i< configKeyList.size();i++){
ConfigInfoVO configKey = configKeyList.get(i);
String key = makeConfigKey(configKey.getDataId(),configKey.getGroup());
//添加到Map容器
defaultDeferredResultProcessor.addDeferredResult(key,result);
}
配置有变化,立即响应client端
public <T> void notifyDeferredResult(String key, final T result) {
//根据key:dataId/group获取当前所有的异步处理对象集合
CopyOnWriteArrayList<DeferredResult> currentDeferredResultList = deferredResultMultimaps.get(key);
if (null == currentDeferredResultList || currentDeferredResultList.size() <= 0){
LoggerUtil.info(getClass(),"notifyDeferredResult have not DeferredResults key:{0},result:{1}.",key,JSON.toJSONString(result));
return;
}
final CopyOnWriteArrayList<DeferredResult> lstDeferredResult = Lists.newCopyOnWriteArrayList(currentDeferredResultList);
//当待处理的异步返回对象超过一定批次后,会有线程池来异步处理
if(lstDeferredResult.size() > CHANGE_CONFIG_BATCH_SIZE){
changeConfigBatchExecutorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < lstDeferredResult.size(); i++){
//当超过批次后,休息一段时间
if (i > 0 && i % CHANGE_CONFIG_BATCH_SIZE==0){
try {
TimeUnit.MILLISECONDS.sleep(CHANGE_CONFIG_BATCH_INTERVALTIME);
} catch (InterruptedException e) {
//ignore
}
}
lstDeferredResult.get(i).setResult(new ResponseEntity<T>(result, HttpStatus.OK));
}
}
});
}
for (int i = 0; i < lstDeferredResult.size(); i++){
lstDeferredResult.get(i).setResult(new ResponseEntity<T>(result,HttpStatus.OK));
}
}
隔离术相关拓展
- web多环境部署配置隔离
- Nacos配置命名空间隔离
- Docker系统资源隔离
- 账户角色权限隔
相关阅读:
架构篇-一分钟掌握高可用架构
架构篇-一分钟掌握可伸缩架构
架构篇-一分钟掌握可扩展架构
架构篇-一分钟掌握性能优化小技巧
架构-一分钟掌握架构师职业规划
本文来自陌颜投稿,不代表胡巴网立场,如若转载,请注明出处:https://www.hu85.com/171373.html
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 xxxxx@qq.com 举报,一经查实,本站将立刻删除。