Flink提交任务的方法是什么-成都创新互联网站建设

关于创新互联

多方位宣传企业产品与服务 突出企业形象

公司简介 公司的服务 荣誉资质 新闻动态 联系我们

Flink提交任务的方法是什么

本篇内容主要讲解“Flink提交任务的方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Flink提交任务的方法是什么”吧!

创新互联建站专注于网站设计制作、成都网站设计、网页设计、网站制作、网站开发。公司秉持“客户至上,用心服务”的宗旨,从客户的利益和观点出发,让客户在网络营销中找到自己的驻足之地。尊重和关怀每一位客户,用严谨的态度对待客户,用专业的服务创造价值,成为客户值得信赖的朋友,为客户解除后顾之忧。

一、关键组件

任务提交过程中有三个重要组件:Dispatcher、JobMaster、JobManagerRunnerImpl。通过下面调用路径先找到MiniDispatcher:

YarnJobClusterEntrypoint的main() -> ClusterEntrypoint的runCluster() -> DefaultDispatcherResourceManagerComponentFactory的create() -> DefaultDispatcherRunnerFactory的createDispatcherRunner() -> DefaultDispatcherRunner的grantLeadership() -> JobDispatcherLeaderProcess的onStart() -> DefaultDispatcherGatewayServiceFactory的create() -> JobDispatcherFactory的createDispatcher() -> MiniDispatcher的start()

Flink提交任务的方法是什么

(1)Dispatcher

负责接收任务提交请求,并分给JobManager执行;

Dispatcher启动时,会运行startRecoveredJobs()来启动需要恢复的任务。当Flink on Yarn模式时,MiniDispatcher将当前任务传入到需要恢复的任务中,这样就实现了任务的提交启动

(2)JobManagerRunner

负责运行JobMaster

(3)JobMaster

负责运行任务,对应旧版的JobManager;

一个任务对应一个JobMaster;

二、JobMaster执行任务

在JobMaster中通过Scheduler、Execution组件来执行一个任务。将任务DAG中每个节点算子分配给TaskManager中的TaskExecutor运行。

Flink提交任务的方法是什么

Execution的start()方法中通过rpc远程调用TaskExecutor的submitTask()方法:

	public void deploy() throws JobException {
		
        ......
		try {

			......

			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

			final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
				vertex.getExecutionGraph().getJobMasterMainThreadExecutor();

			
			CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
				.thenCompose(Function.identity())
				.whenCompleteAsync(
					.....,
					jobMasterMainThreadExecutor);

		}
		catch (Throwable t) {
			......
		}
	}

三、TaskExecutor运行算子节点任务

Flink提交任务的方法是什么

TaskExecutor的submitTask()方法中通过创建org.apache.flink.runtime.taskmanager.Task来运行算子任务。Task的doRun()方法中通过算子节点对应的执行类AbstractInvokable来运行算子的处理逻辑,每个算子对应的执行类AbstractInvokable在客户端提交任务时确定,StreamExecutionEnvironment的addOperator():

	public  void addOperator(
			Integer vertexID,
			@Nullable String slotSharingGroup,
			@Nullable String coLocationGroup,
			StreamOperatorFactory operatorFactory,
			TypeInformation inTypeInfo,
			TypeInformation outTypeInfo,
			String operatorName) {
		Class invokableClass =
				operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;
		addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo,
				outTypeInfo, operatorName, invokableClass);
	}

当是流式任务时,调用StreamTask的invoke()方法。当是source节点时,通过调用链 StreamTask.invoke() -> StreamTask.runMailboxLoop() -> MailboxProcessor.runMailboxLoop() -> SourceStreamTask.processInput() :

	protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {

		controller.suspendDefaultAction();

		// Against the usual contract of this method, this implementation is not step-wise but blocking instead for
		// compatibility reasons with the current source interface (source functions run as a loop, not in steps).
		sourceThread.setTaskDescription(getName());
		sourceThread.start();
		sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) -> {
			if (isCanceled() && ExceptionUtils.findThrowable(sourceThreadThrowable, InterruptedException.class).isPresent()) {
				mailboxProcessor.reportThrowable(new CancelTaskException(sourceThreadThrowable));
			} else if (!isFinished && sourceThreadThrowable != null) {
				mailboxProcessor.reportThrowable(sourceThreadThrowable);
			} else {
				mailboxProcessor.allActionsCompleted();
			}
		});
	}

创建线程LegacySourceFunctionThread实例,来开启单独生产数据的线程。LegacySourceFunctionThread的run()方法中调用StreamSource的run()方法:

	public void run(final Object lockingObject,
			final StreamStatusMaintainer streamStatusMaintainer,
			final Output> collector,
			final OperatorChain operatorChain) throws Exception {

		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

		final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
		final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
			? getExecutionConfig().getLatencyTrackingInterval()
			: configuration.getLong(MetricOptions.LATENCY_INTERVAL);

		LatencyMarksEmitter latencyEmitter = null;
		if (latencyTrackingInterval > 0) {
			latencyEmitter = new LatencyMarksEmitter<>(
				getProcessingTimeService(),
				collector,
				latencyTrackingInterval,
				this.getOperatorID(),
				getRuntimeContext().getIndexOfThisSubtask());
		}

		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

		this.ctx = StreamSourceContexts.getSourceContext(
			timeCharacteristic,
			getProcessingTimeService(),
			lockingObject,
			streamStatusMaintainer,
			collector,
			watermarkInterval,
			-1);

		try {
			userFunction.run(ctx);

			// if we get here, then the user function either exited after being done (finite source)
			// or the function was canceled or stopped. For the finite source case, we should emit
			// a final watermark that indicates that we reached the end of event-time, and end inputs
			// of the operator chain
			if (!isCanceledOrStopped()) {
				// in theory, the subclasses of StreamSource may implement the BoundedOneInput interface,
				// so we still need the following call to end the input
				synchronized (lockingObject) {
					operatorChain.endHeadOperatorInput(1);
				}
			}
		} finally {
			if (latencyEmitter != null) {
				latencyEmitter.close();
			}
		}
	}

StreamSource的run()方法中调用 userFunction.run(ctx);  当数据源是kafka时,userFunction为FlinkKafkaConsumerBase

3.1 userFunction和 headOperator

最后执行run()的headOperator和算子程序userFunction是在添加算子时确定的,比如添加kafka数据源时

 environment.addSource(new FlinkKafkaConsumer(......));

最后调用的addSource()方法:

	public  DataStreamSource addSource(SourceFunction function, String sourceName, TypeInformation typeInfo) {

		TypeInformation resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);

		boolean isParallel = function instanceof ParallelSourceFunction;

		clean(function);

		final StreamSource sourceOperator = new StreamSource<>(function);
		return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);
	}

headOperator为StreamSource,StreamSource中的userFunction为FlinkKafkaConsumer

到此,相信大家对“Flink提交任务的方法是什么”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


分享标题:Flink提交任务的方法是什么
文章网址:http://kswsj.cn/article/jpjdii.html

其他资讯