当前位置: 首页 > news >正文

在线用代码做网站网络服务器是指什么

在线用代码做网站,网络服务器是指什么,线上直播营销策划方案,如何用使用好wordpress系列文章目录 一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 文章目录 系列文章目录TaskGroupContainer初始…

系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel


文章目录

  • 系列文章目录
  • TaskGroupContainer
  • 初始化
  • start方法详细步骤
    • 1、初始化task执行相关的状态信息
    • 2、开始执行任务while (true)循环
  • TaskGroupContainer源码


TaskGroupContainer

DataX的TaskGroupContainer是JobContainer将所有task分配到TaskGroup中执行的一个容器。这个容器的主要入口是start方法,该方法会执行两个主要任务:
初始化task执行相关的状态信息和 循环检测所有任务的执行状态 。此外,TaskGroupContainer还有一个名为reportTaskGroupCommunication的方法,用于向容器汇报状态。这个方法会收集当前TaskGroupContainer对应所有Task的通信信息,并将其合并成一个通信信息。


初始化

设置配置文件Configuration
初始化监控
设置jobId
设置taskGroupId
设置channel类实例channelClazz

start方法详细步骤

1、初始化task执行相关的状态信息

  • taskConfigMap:taskId与其对应的Congifuration的map映射集合
  • 待运行的任务队列taskQueue
  • 运行失败任务taskFailedExecutorMap
  • 正在执行的任务集合runTasks
  • 任务开始时间taskStartTimeMap。

2、开始执行任务while (true)循环

1.判断task状态
循环遍历所有任务,如果任务尚未完成跳过。如果任务已经完成从任务列表中删除。如果任务失败判断是否支持重试,如支持重试并重试次数没有超过最大限制则重试执行。
2.发现该taskGroup下taskExecutor的总状态失败则汇报错误
3.有任务未执行,且正在运行的任务数小于最大通道限制,创建TaskExecutor实例,调用doStart真正执行数据同步任务,从待运行task列表中删除同时加入到正在运行的队列。TaskExecutor构建的时候,生成一个reader、channel和writer,并启动两个线程,reader生产数据写入channel,writer从channel中读数据,任务执行完毕时,通过wirter将任务状态置为成功。
4.检查执行队列和所有的任务状态,如果所有的任务都执行成功,则汇报taskGroup的状态并从循环中退出。
5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报
6.当所有的执行完成从while中退出之后,再次全局汇报当前的任务状态

TaskGroupContainer源码

/**
* task任务运行容器
**/
public class TaskGroupContainer extends AbstractContainer {private static final Logger LOG = LoggerFactory.getLogger(TaskGroupContainer.class);/*** 当前taskGroup所属jobId*/private long jobId;/*** 当前taskGroupId*/private int taskGroupId;/*** 使用的channel类*/private String channelClazz;/*** task收集器使用的类*/private String taskCollectorClass;private TaskMonitor taskMonitor = TaskMonitor.getInstance();public TaskGroupContainer(Configuration configuration) {super(configuration);initCommunicator(configuration);this.jobId = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);this.taskGroupId = this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);this.channelClazz = this.configuration.getString(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CLASS);this.taskCollectorClass = this.configuration.getString(CoreConstant.DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS);}private void initCommunicator(Configuration configuration) {super.setContainerCommunicator(new StandaloneTGContainerCommunicator(configuration));}public long getJobId() {return jobId;}public int getTaskGroupId() {return taskGroupId;}@Overridepublic void start() {try {/*** 状态check时间间隔,较短,可以把任务及时分发到对应channel中*/int sleepIntervalInMillSec = this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL, 100);/*** 状态汇报时间间隔,稍长,避免大量汇报*/long reportIntervalInMillSec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL,10000);/*** 2分钟汇报一次性能统计*/// 获取channel数目int channelNumber = this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);int taskMaxRetryTimes = this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES, 1);long taskRetryIntervalInMsec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000);long taskMaxWaitInMsec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000);List<Configuration> taskConfigs = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);if(LOG.isDebugEnabled()) {LOG.debug("taskGroup[{}]'s task configs[{}]", this.taskGroupId,JSON.toJSONString(taskConfigs));}int taskCountInThisTaskGroup = taskConfigs.size();LOG.info(String.format("taskGroupId=[%d] start [%d] channels for [%d] tasks.",this.taskGroupId, channelNumber, taskCountInThisTaskGroup));this.containerCommunicator.registerCommunication(taskConfigs);Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId与task配置List<Configuration> taskQueue = buildRemainTasks(taskConfigs); //待运行task列表Map<Integer, TaskExecutor> taskFailedExecutorMap = new HashMap<Integer, TaskExecutor>(); //taskId与上次失败实例List<TaskExecutor> runTasks = new ArrayList<TaskExecutor>(channelNumber); //正在运行taskMap<Integer, Long> taskStartTimeMap = new HashMap<Integer, Long>(); //任务开始时间long lastReportTimeStamp = 0;Communication lastTaskGroupContainerCommunication = new Communication();while (true) {//1.判断task状态boolean failedOrKilled = false;Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();for(Map.Entry<Integer, Communication> entry : communicationMap.entrySet()){Integer taskId = entry.getKey();Communication taskCommunication = entry.getValue();if(!taskCommunication.isFinished()){continue;}TaskExecutor taskExecutor = removeTask(runTasks, taskId);//上面从runTasks里移除了,因此对应在monitor里移除taskMonitor.removeTask(taskId);//失败,看task是否支持failover,重试次数未超过最大限制if(taskCommunication.getState() == State.FAILED){taskFailedExecutorMap.put(taskId, taskExecutor);if(taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes){taskExecutor.shutdown(); //关闭老的executorcontainerCommunicator.resetCommunication(taskId); //将task的状态重置Configuration taskConfig = taskConfigMap.get(taskId);taskQueue.add(taskConfig); //重新加入任务列表}else{failedOrKilled = true;break;}}else if(taskCommunication.getState() == State.KILLED){failedOrKilled = true;break;}else if(taskCommunication.getState() == State.SUCCEEDED){Long taskStartTime = taskStartTimeMap.get(taskId);if(taskStartTime != null){Long usedTime = System.currentTimeMillis() - taskStartTime;LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms",this.taskGroupId, taskId, usedTime);//usedTime*1000*1000 转换成PerfRecord记录的ns,这里主要是简单登记,进行最长任务的打印。因此增加特定静态方法PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL,taskStartTime, usedTime * 1000L * 1000L);taskStartTimeMap.remove(taskId);taskConfigMap.remove(taskId);}}}// 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误if (failedOrKilled) {lastTaskGroupContainerCommunication = reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());}//3.有任务未执行,且正在运行的任务数小于最大通道限制Iterator<Configuration> iterator = taskQueue.iterator();while(iterator.hasNext() && runTasks.size() < channelNumber){Configuration taskConfig = iterator.next();Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);int attemptCount = 1;TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);if(lastExecutor!=null){attemptCount = lastExecutor.getAttemptCount() + 1;long now = System.currentTimeMillis();long failedTime = lastExecutor.getTimeStamp();if(now - failedTime < taskRetryIntervalInMsec){  //未到等待时间,继续留在队列continue;}if(!lastExecutor.isShutdown()){ //上次失败的task仍未结束if(now - failedTime > taskMaxWaitInMsec){markCommunicationFailed(taskId);reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);throw DataXException.asDataXException(CommonErrorCode.WAIT_TIME_EXCEED, "task failover等待超时");}else{lastExecutor.shutdown(); //再次尝试关闭continue;}}else{LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown",this.taskGroupId, taskId, lastExecutor.getAttemptCount());}}Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);taskStartTimeMap.put(taskId, System.currentTimeMillis());taskExecutor.doStart();iterator.remove();runTasks.add(taskExecutor);//上面,增加task到runTasks列表,因此在monitor里注册。taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));taskFailedExecutorMap.remove(taskId);LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started",this.taskGroupId, taskId, attemptCount);}//4.任务列表为空,executor已结束, 搜集状态为success--->成功if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {// 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确lastTaskGroupContainerCommunication = reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);break;}// 5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报long now = System.currentTimeMillis();if (now - lastReportTimeStamp > reportIntervalInMillSec) {lastTaskGroupContainerCommunication = reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);lastReportTimeStamp = now;//taskMonitor对于正在运行的task,每reportIntervalInMillSec进行检查for(TaskExecutor taskExecutor:runTasks){taskMonitor.report(taskExecutor.getTaskId(),this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));}}Thread.sleep(sleepIntervalInMillSec);}//6.最后还要汇报一次reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);} catch (Throwable e) {Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();if (nowTaskGroupContainerCommunication.getThrowable() == null) {nowTaskGroupContainerCommunication.setThrowable(e);}nowTaskGroupContainerCommunication.setState(State.FAILED);this.containerCommunicator.report(nowTaskGroupContainerCommunication);throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);}finally {if(!PerfTrace.getInstance().isJob()){//最后打印cpu的平均消耗,GC的统计VMInfo vmInfo = VMInfo.getVmInfo();if (vmInfo != null) {vmInfo.getDelta(false);LOG.info(vmInfo.totalString());}LOG.info(PerfTrace.getInstance().summarizeNoException());}}}private Map<Integer, Configuration> buildTaskConfigMap(List<Configuration> configurations){Map<Integer, Configuration> map = new HashMap<Integer, Configuration>();for(Configuration taskConfig : configurations){int taskId = taskConfig.getInt(CoreConstant.TASK_ID);map.put(taskId, taskConfig);}return map;}private List<Configuration> buildRemainTasks(List<Configuration> configurations){List<Configuration> remainTasks = new LinkedList<Configuration>();for(Configuration taskConfig : configurations){remainTasks.add(taskConfig);}return remainTasks;}private TaskExecutor removeTask(List<TaskExecutor> taskList, int taskId){Iterator<TaskExecutor> iterator = taskList.iterator();while(iterator.hasNext()){TaskExecutor taskExecutor = iterator.next();if(taskExecutor.getTaskId() == taskId){iterator.remove();return taskExecutor;}}return null;}private boolean isAllTaskDone(List<TaskExecutor> taskList){for(TaskExecutor taskExecutor : taskList){if(!taskExecutor.isTaskFinished()){return false;}}return true;}private Communication reportTaskGroupCommunication(Communication lastTaskGroupContainerCommunication, int taskCount){Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();nowTaskGroupContainerCommunication.setTimestamp(System.currentTimeMillis());Communication reportCommunication = CommunicationTool.getReportCommunication(nowTaskGroupContainerCommunication,lastTaskGroupContainerCommunication, taskCount);this.containerCommunicator.report(reportCommunication);return reportCommunication;}private void markCommunicationFailed(Integer taskId){Communication communication = containerCommunicator.getCommunication(taskId);communication.setState(State.FAILED);}/*** TaskExecutor是一个完整task的执行器* 其中包括1:1的reader和writer*/class TaskExecutor {private Configuration taskConfig;private int taskId;private int attemptCount;private Channel channel;private Thread readerThread;private Thread writerThread;private ReaderRunner readerRunner;private WriterRunner writerRunner;/*** 该处的taskCommunication在多处用到:* 1. channel* 2. readerRunner和writerRunner* 3. reader和writer的taskPluginCollector*/private Communication taskCommunication;public TaskExecutor(Configuration taskConf, int attemptCount) {// 获取该taskExecutor的配置this.taskConfig = taskConf;Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER)&& null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER),"[reader|writer]的插件参数不能为空!");// 得到taskIdthis.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID);this.attemptCount = attemptCount;/*** 由taskId得到该taskExecutor的Communication* 要传给readerRunner和writerRunner,同时要传给channel作统计用*/this.taskCommunication = containerCommunicator.getCommunication(taskId);Validate.notNull(this.taskCommunication,String.format("taskId[%d]的Communication没有注册过", taskId));this.channel = ClassUtil.instantiate(channelClazz,Channel.class, configuration);this.channel.setCommunication(this.taskCommunication);/*** 获取transformer的参数*/List<TransformerExecution> transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig);/*** 生成writerThread*/writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);this.writerThread = new Thread(writerRunner,String.format("%d-%d-%d-writer",jobId, taskGroupId, this.taskId));//通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME)));/*** 生成readerThread*/readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);this.readerThread = new Thread(readerRunner,String.format("%d-%d-%d-reader",jobId, taskGroupId, this.taskId));/*** 通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器*/this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.taskConfig.getString(CoreConstant.JOB_READER_NAME)));}public void doStart() {this.writerThread.start();// reader没有起来,writer不可能结束if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,this.taskCommunication.getThrowable());}this.readerThread.start();// 这里reader可能很快结束if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {// 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,this.taskCommunication.getThrowable());}}private AbstractRunner generateRunner(PluginType pluginType) {return generateRunner(pluginType, null);}private AbstractRunner generateRunner(PluginType pluginType, List<TransformerExecution> transformerInfoExecs) {AbstractRunner newRunner = null;TaskPluginCollector pluginCollector;switch (pluginType) {case READER:newRunner = LoadUtil.loadPluginRunner(pluginType,this.taskConfig.getString(CoreConstant.JOB_READER_NAME));newRunner.setJobConf(this.taskConfig.getConfiguration(CoreConstant.JOB_READER_PARAMETER));pluginCollector = ClassUtil.instantiate(taskCollectorClass, AbstractTaskPluginCollector.class,configuration, this.taskCommunication,PluginType.READER);RecordSender recordSender;if (transformerInfoExecs != null && transformerInfoExecs.size() > 0) {recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,this.taskCommunication ,pluginCollector, transformerInfoExecs);} else {recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);}((ReaderRunner) newRunner).setRecordSender(recordSender);/*** 设置taskPlugin的collector,用来处理脏数据和job/task通信*/newRunner.setTaskPluginCollector(pluginCollector);break;case WRITER:newRunner = LoadUtil.loadPluginRunner(pluginType,this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME));newRunner.setJobConf(this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));pluginCollector = ClassUtil.instantiate(taskCollectorClass, AbstractTaskPluginCollector.class,configuration, this.taskCommunication,PluginType.WRITER);((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(this.channel, pluginCollector));/*** 设置taskPlugin的collector,用来处理脏数据和job/task通信*/newRunner.setTaskPluginCollector(pluginCollector);break;default:throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR, "Cant generateRunner for:" + pluginType);}newRunner.setTaskGroupId(taskGroupId);newRunner.setTaskId(this.taskId);newRunner.setRunnerCommunication(this.taskCommunication);return newRunner;}// 检查任务是否结束private boolean isTaskFinished() {// 如果reader 或 writer没有完成工作,那么直接返回工作没有完成if (readerThread.isAlive() || writerThread.isAlive()) {return false;}if(taskCommunication==null || !taskCommunication.isFinished()){return false;}return true;}private int getTaskId(){return taskId;}private long getTimeStamp(){return taskCommunication.getTimestamp();}private int getAttemptCount(){return attemptCount;}private boolean supportFailOver(){return writerRunner.supportFailOver();}private void shutdown(){writerRunner.shutdown();readerRunner.shutdown();if(writerThread.isAlive()){writerThread.interrupt();}if(readerThread.isAlive()){readerThread.interrupt();}}private boolean isShutdown(){return !readerThread.isAlive() && !writerThread.isAlive();}}
}
http://www.mmbaike.com/news/50591.html

相关文章:

  • 用asp做网站课程抖音营销推广方案
  • 网站建设与实现毕业答辩ppt免费做网站网站
  • 网站开发域名怎么宣传网站
  • java网站开发文档惠州关键词排名优化
  • dedecms 门户网站免费源码网站
  • 温州网站制作策划图片百度搜索
  • wordpress建站侵权搜全网的浏览器
  • 商城网站建设的优点郑州聚商网络科技有限公司
  • 微网站开发费用写一篇软文推广自己的学校
  • 静态网站开发课程网站优化的意义
  • 机械设备如何做网站百度seo快速排名
  • 福州做网站建设公司营销策划的六个步骤
  • 外国做网站的平台网络服务商在哪咨询
  • 凯里网站开发优秀网页设计赏析
  • 浙江建设厅 继续教育 网站首页拉人头最暴利的app
  • 怎么看网站有没有做地图新闻稿代写平台
  • 专业网站制作推广服务提交链接
  • 能制作游戏的软件广州seo网站推广优化
  • tech域名可以做网站吗郑州推广优化公司
  • 宁波做公司网站公司荆州百度推广
  • 商城网站制作费用关键词优化工具互点
  • 长沙优化公司关键词优化建议
  • 中国空间站图片绘画网络营销与管理专业是干什么的
  • thinkphp网站模板卢松松外链工具
  • 河北高端网站建设google play服务
  • 支付网站招聘费分录怎么做济南网络推广
  • 网站建设竞标需要怎么做广州seo推广培训
  • 平谷区网站建设营销模式100个经典案例
  • 龙岩做网站多少钱中山做网站推广公司
  • 无锡市城乡和住房建设局网站网络销售好不好做