当前位置: 首页 > news >正文

佛山网站建设是哪个b2b和b2c是什么意思

佛山网站建设是哪个,b2b和b2c是什么意思,mip手机网站模板,公众号用什么软件制作最好一、Dubbo线程模型 首先明确一个基本概念:IO 线程和业务线程的区别 IO 线程:配置在netty 连接点的用于处理网络数据的线程,主要处理编解码等直接与网络数据 打交道的事件。 业务线程:用于处理具体业务逻辑的线程,可以…

一、Dubbo线程模型

首先明确一个基本概念:IO 线程和业务线程的区别
IO 线程:配置在netty 连接点的用于处理网络数据的线程,主要处理编解码等直接与网络数据
打交道的事件。
业务线程:用于处理具体业务逻辑的线程,可以理解为自己在provider 上写的代码所执行的线
程环境。

Dubbo 默认采用的是长链接的方式,即默认情况下一个consumer 和一个provider 之间只会建立
一条链接,这种情况下IO 线程的工作就是编码和解码数据,监听具体的数据请求,直接通过Channel发布数据等等;

有两个参数⽤来配置服务消费者和服务提供者直接的socket连接个数:

1. shareconnections:表示可共享的socket连接个数
2. connections:表示不共享的socket连接个数

服务A的shareconnections或者connections为2时,服务A的消费者会向服务A的提供者建⽴两个socket连接:

业务线程就是处理IO 线程处理之后的数据,业务线程并不知道任何跟网络相关的内容,只是纯
粹的处理业务逻辑,在业务处理逻辑的时候往往存在复杂的逻辑,所以业务线程池的配置往往都要
比IO 线程池的配置大很多。
IO 线程部分,Netty 服务提供方NettyServer 又使用了两级线程池,master 主要用来接受客户
端的链接请求,并把接受的请求分发给worker 来处理。整个过程如下图:

IO 线程与业务线程的交互如下:

IO 线程的派发策略:

默认是all:所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。即worker 线程接收到事件后,将该事件提交到业务线程池中,自己再去处理其他IO 事件。
direct:worker 线程接收到事件后,由worker 执行到底。
message:只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在IO 线程上执行
execution:只有请求消息派发到线程池,不含响应(客户端线程池),响应和其它连接断开事件,心跳等消息,直接在IO线程上执行
connection:在IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

业务线程池设置:
fixed:固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
coresize:200
maxsize:200
队列:SynchronousQueue
回绝策略:AbortPolicyWithReport - 打印线程信息jstack,之后抛出异常
cached:缓存线程池,空闲一分钟自动删除,需要时重建。
limited:可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。

配置示例:

<dubbo:protocol name="dubbo"dispatcher="all"threadpool="fixed"threads="100"/>

在整个消费者调用过程中,各个线程池都比较重要,其中比较有特色的就是AllChannelHandler,它完成了IO线程转向用户线程的任务转移,比较关键。 

二、派发策略(All)源码解析

消费者启动的时候会执行DubboProtocol#initClient建议与服务端端的socket连接

private ExchangeClient initClient(URL url) {ExchangeClient client;try {// Replace InstanceAddressURL with ServiceConfigURL.url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(),  url.getParameters());// connection should be lazyif (url.getParameter(LAZY_CONNECT_KEY, false)) {client = new LazyConnectExchangeClient(url, requestHandler);} else {client = Exchangers.connect(url, requestHandler);}} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}return client;
}
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

Transporters#connect最终返回的是NettyClient,点进这个对象的构造方法

public Client connect(URL url, ChannelHandler handler) throws RemotingException {return new NettyClient(url, handler);
}
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handlersuper(url, wrapChannelHandler(url, handler));
}

NettyClient#wrapChannelHandler中再次利用SPI机制获取线程派发策略,dubbo默认的策略为allDispatcher策略。

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {return new MultiMessageHandler(new HeartbeatHandler(url.getOrDefaultFrameworkModel().getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));
}
public ChannelHandler dispatch(ChannelHandler handler, URL url) {return new AllChannelHandler(handler, url);
}

 除了默认的AllDispatcher,还有DirectDispatcher,MessageOnlyDispatcher等。

 当消费端接收到远程服务端的响应之后,按照Netty的处理流程,消息会在channel绑定的handler上传递,netty底层会调用handler#received。可以看到connected,disconnected,received,caught等方法都是在新的线程池ExecutorService中执行,executor.execute方法会将任务ChannelEventRunnable提交到ExecutorService中。

public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler(ChannelHandler handler, URL url) {super(handler, url);}@Overridepublic void connected(Channel channel) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);}}@Overridepublic void disconnected(Channel channel) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);}}@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if(message instanceof Request && t instanceof RejectedExecutionException){sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);}}
}

由于AllChannelHandler方法是在前面handler的基础上包装了一层,所以ChannelEventRunnable中会将消息传递给AllHandlel的下一个handler,从这里也清晰的看到了AllChannelHandler完成了IO线程转向用户线程的任务转移。

public void run() {if (state == ChannelState.RECEIVED) {try {handler.received(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}} else {switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is: " + message + ", exception is " + exception, e);}break;default:logger.warn("unknown state: " + state + ", message is " + message);}}}

三、AllDispatcher策略异常超时问题

Dubbo有一个经典问题,就是当配置了消息派发策略为AllDispatcher时,当服务端线程池满了之后,当消费端再次发送请求,就会一直傻傻等待超时导致没有任何服务端响应。那么问题就出现在AllChannelHandler,前面已经说了AllDispatcher策略就是所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。即worker 线程接收到事件后,将该事件提交到业务线程池中,自己再去处理其他IO 事件。

问题出现原因:

那么当服务端线程池打满之后,此时又再次来了一个请求,此时依然会提交给线程池执行,那么了解线程池原理的就清楚线程池任务满了之后会执行拒绝策略抛出RejectedExecutionException异常,此时就会进入到received的catch方法中去,然后就又再次抛出ExecutionException异常。

            
public void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}

那么抛出的异常就又会被netty捕获,进而继续执行nettyHandler的caught方法,可以看到这里又再次将任务丢到了线程池中。但是此时线程池依然是满的,业务线程池所有线程都堵住了,所以也不能将异常消息返回给客户端,然后客户端消费者只能傻傻等到超时。

public void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);}
}

解决办法可以设置dispatcher为message,只有请求和响应交给业务线程池处理,其他的在IO线程处理,配置如下:

<dubbo:protocol name="dubbo" dispatcher="message" />

后面dubbo也修复了这个问题,再received方法的catch中新加了一部分逻辑,注释的大致意思也就是说:修复当线程池满了之后异常信息无法被发送给消费端的问题(当线程池满了,拒绝执行任务,会引起消费端等待超时),所以代码中判断了下当抛出异常为RejectedExecutionException时,就不把异常抛出交给AllChannelHandler#caught方法中的线程池执行,而是直接用IO线程在通过channel将消息及时反馈给消费者,消费者也就会收到服务端的“threadpool is exhausted ,detail msg”等响应消息。

public void received(Channel channel, Object message) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time outif(message instanceof Request && t instanceof RejectedExecutionException){Request request = (Request)message;if(request.isTwoWay()){String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}

http://www.mmbaike.com/news/68733.html

相关文章:

  • 绵阳网站建设费用北京搜索引擎优化主管
  • 凡科的模板做网站平台做推广的技巧
  • 网络营销托管服务商指的是成都网站seo服务
  • 咸阳做网站公司百度推广如何计费
  • 网站统一做301百度推广开户代理
  • 湖南省建设监理协会网站app推广30元一单平台
  • 找人做网站被骗 公安不管百度广告怎么收费
  • 高端网站建设公司价格疫情最新消息今天
  • 昆明网站建设教学视频志鸿优化网下载
  • 南京网站开发南京乐识优百度搜索引擎收录入口
  • 网站规划建设与管理维护大学论文百度学术官网
  • 品牌建设网站公司排名全网营销系统
  • 软件测试怎么学谷歌seo推广招聘
  • 做电视的视频网站石家庄网站建设方案
  • 网站建设类的论文题目免费制作自己的网页
  • 酒店网站建设方案策划最近军事新闻热点大事件
  • 上海学做网站今天新闻
  • 信誉楼线上商城小程序高端网站优化公司
  • 南昌专门做网站的人电子商务网站建设案例
  • 英文建站模板品牌营销策略案例
  • ps网站子页怎么做黑马培训是正规学校吗
  • wordpress模板页面seo平台怎么样
  • 怎么制作微信公众号文章百度seo优化技巧
  • 网站做直播吗企业网站seo哪里好
  • 休闲网站建设网站推广的方式有
  • 网站用什么系统网络营销推广手段
  • 网站升级正在升级维护seo排名策略
  • 做网站的都是什么专业毕业的站长之家网站介绍
  • 良品铺子网络营销策划书杭州seo排名优化
  • 太原网站建设技术托管seo一个月赚多少钱