精华内容
下载资源
问答
  • 倾斜摄影测试数据,s3m切片数据
  • 切片数据下载

    2014-04-10 18:08:15
    最近需要使用一些切片数据

    最近需要使用一些切片数据,看了下切片。由于切图时采用ArcGIS的默认的Web墨卡托切图方案,发现不同的行、列、级别对应google、天地图、ArcGIS online的数据不同。

    对比一下:

    天地图:***********************************************************************************************************************************

    google地图:http://mt0.google.cn/vt/lyrs=s@147&hl=x-local&gl=CN&src=app&x=107912&s=&y=49664&z=17&s=

                         (https://ditu.google.com/

                          https://khms0.google.com/kh/v=147&src=app&x=107910&s=&y=49665&z=17&s=

                       (https://maps.google.com/

    ArcGIS online:http://server.arcgisonline.com/arcgis/rest/services/World_Imagery/MapServer/tile/17/49665/107910

    同时17级的行号为49665,列号为107910,所对应的切片不同,现在不确定到底哪种地图做了偏移,天地图和ArcGIS online切片数据一致。

    https://ditu.google.com/)这个与其它的不一致,是否做了偏移?

    
    展开全文
  • 为了提高虚拟人切片数据分割的自动化程度,提出了基于支持向量机的虚拟人切片数据分割方法。给出了分割策略、分割步骤,并讨论了切片数据分割中的支持向量机核及相关参数的选择。实验证明,利用虚拟人切片数据的空间...
  • 将ArcGIS10切片数据加载到SQLlite数据库,经过测试10G数据运行时间20分钟。
  • Flink1.7.2 Dataset 文件切片计算方式和切片数据读取源码分析 源码 https://github.com/opensourceteams/flink-maven-scala 概述 了解读取的文件或目录,具体进行切片拆分的实现 了解任务读取切片中的数据规则 ...

    Flink1.7.2 Dataset 文件切片计算方式和切片数据读取源码分析

    源码

    概述

    • 了解读取的文件或目录,具体进行切片拆分的实现
    • 了解任务读取切片中的数据规则

    数据文件读取结论

    开始位置索引从0开始的

    • 实际开始位置,0
    • 结束位置:按行一直读,直到位置索引大于等于切片大小时,再读下一个切片的1m数据,由于此时当前切片数据已全部读完了,所以就overLimit=true,但是也会读取下一个切片的一行数据

    开始位置索引从大于0开始的

    • 实际开始位置,由切片分到的位置开始算,找到第一个换行符的位置 +1开始计算
    • 结束位置,当读到的位置索引,大于等于切片数据大小时,说明本切片已读完,如果下一个切片还有数据,就从下一个切片读到第一个换行符的数据,如果没有下一个切片,就到当前读到的位置结束

    图解

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    输入数据

    • 注意空格,第一行6个byte,第二行3个byte,(一共9个byte的数据,9个byte中包括一个byte的换行符)
    c a a
    b c
    

    -转志Integer

    99 32 97 32 97 32 10
    98 32 99
    

    WordCount.scala

    • java的也不影响分析,只是 WordCount.scala写的方式不一样,整个过程,逻辑是一样的
    package com.opensourceteams.module.bigdata.flink.example.dataset.worldcount
    
    import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
    import org.apache.flink.api.scala.ExecutionEnvironment
    
    
    /**
      * 批处理,DataSet WordCount分析
      */
    object WordCountRun {
    
    
      def main(args: Array[String]): Unit = {
    
        //调试设置超时问题
        val env : ExecutionEnvironment= ExecutionEnvironment.createLocalEnvironment(ConfigurationUtil.getConfiguration(true))
        env.setParallelism(2)
    
        val dataSet = env.readTextFile("file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/src/main/resources/data/line.txt")
    
    
        import org.apache.flink.streaming.api.scala._
        val result = dataSet.flatMap(x => x.split(" ")).map((_,1)).groupBy(0).sum(1)
    
    
    
        result.print()
    
    
    
    
      }
    
    }
    
    

    源码分析(文件拆分成切片)

    • 预拆分数据,之所以叫做预,就不是实际的,实际读取时,会考虑更多因素,会有一定变化,下面有详细说明

    • 把文件按并行度拆分成FileInputSplit的个数,当然并不是完全有几个并行度就生成几个FileInputSplit对象,根据具体算法得到,但是FileInputSplit个数,一定是(并行度个数,或者并行度个数+1),因为计算FileInputSplit个数时,参照物是文件大小 / 并行度 ,如果没有余数,刚好整除,那么FileInputSplit个数一定是并行度,如果有余数,FileInputSplit个数就为是(并行度个数,或者并行度个数+1)

    • 本示例拆分的结果

      [0] file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/src/main/resources/data/line.txt:0+5
      [1] file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/src/main/resources/data/line.txt:5+4
      

    ExecutionGraphBuilder.buildGraph

    • JobMaster在实例化时,构建ExecutionGraph,会调用 ExecutionGraphBuilder.buildGraph(jobGraph)

    • 把jobGraph是由JobVertex组成,调用executionGraph.attachJobGraph(sortedTopology) 把JobGraph转成ExecutionGraph,ExecutionGraph由ExecutionJobVertex组成,即把JobVertex转成ExecutionJobVertex

      executionGraph.attachJobGraph(sortedTopology);
      
      sortedTopology = {ArrayList@5177}  size = 3
      

    0 = {InputFormatVertex@5459} “CHAIN DataSource (at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun.main(WordCountRun.scala:19)(org.apache.flink.api.java.io.TextInp)>FlatMap(FlatMapatcom.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun.main(WordCountRun.scala:19) (org.apache.flink.api.java.io.TextInp) -> FlatMap (FlatMap at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun.main(WordCountRun.scala:23)) -> Map (Map at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Combine (SUM(1)) (org.apache.flink.runtime.operators.DataSourceTask)”
    1 = {JobVertex@5460} “Reduce (SUM(1)) (org.apache.flink.runtime.operators.BatchTask)”
    2 = {OutputFormatVertex@5461} “DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)”
    ```

    • 调用ExecutionGraph.attachJobGraph
    /**
    	 * Builds the ExecutionGraph from the JobGraph.
    	 * If a prior execution graph exists, the JobGraph will be attached. If no prior execution
    	 * graph exists, then the JobGraph will become attach to a new empty execution graph.
    	 */
    	@Deprecated
    	public static ExecutionGraph buildGraph(
    			@Nullable ExecutionGraph prior,
    			JobGraph jobGraph,
    			Configuration jobManagerConfig,
    			ScheduledExecutorService futureExecutor,
    			Executor ioExecutor,
    			SlotProvider slotProvider,
    			ClassLoader classLoader,
    			CheckpointRecoveryFactory recoveryFactory,
    			Time rpcTimeout,
    			RestartStrategy restartStrategy,
    			MetricGroup metrics,
    			int parallelismForAutoMax,
    			BlobWriter blobWriter,
    			Time allocationTimeout,
    			Logger log)
    		throws JobExecutionException, JobException {
    
    		checkNotNull(jobGraph, "job graph cannot be null");
    
    		final String jobName = jobGraph.getName();
    		final JobID jobId = jobGraph.getJobID();
    
    		final FailoverStrategy.Factory failoverStrategy =
    				FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);
    
    		final JobInformation jobInformation = new JobInformation(
    			jobId,
    			jobName,
    			jobGraph.getSerializedExecutionConfig(),
    			jobGraph.getJobConfiguration(),
    			jobGraph.getUserJarBlobKeys(),
    			jobGraph.getClasspaths());
    
    		// create a new execution graph, if none exists so far
    		final ExecutionGraph executionGraph;
    		try {
    			executionGraph = (prior != null) ? prior :
    				new ExecutionGraph(
    					jobInformation,
    					futureExecutor,
    					ioExecutor,
    					rpcTimeout,
    					restartStrategy,
    					failoverStrategy,
    					slotProvider,
    					classLoader,
    					blobWriter,
    					allocationTimeout);
    		} catch (IOException e) {
    			throw new JobException("Could not create the ExecutionGraph.", e);
    		}
    
    		// set the basic properties
    
    		executionGraph.setScheduleMode(jobGraph.getScheduleMode());
    		executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
    
    		try {
    			executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
    		}
    		catch (Throwable t) {
    			log.warn("Cannot create JSON plan for job", t);
    			// give the graph an empty plan
    			executionGraph.setJsonPlan("{}");
    		}
    
    		// initialize the vertices that have a master initialization hook
    		// file output formats create directories here, input formats create splits
    
    		final long initMasterStart = System.nanoTime();
    		log.info("Running initialization on master for job {} ({}).", jobName, jobId);
    
    		for (JobVertex vertex : jobGraph.getVertices()) {
    			String executableClass = vertex.getInvokableClassName();
    			if (executableClass == null || executableClass.isEmpty()) {
    				throw new JobSubmissionException(jobId,
    						"The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
    			}
    
    			if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
    				if (parallelismForAutoMax < 0) {
    					throw new JobSubmissionException(
    						jobId,
    						PARALLELISM_AUTO_MAX_ERROR_MESSAGE);
    				}
    				else {
    					vertex.setParallelism(parallelismForAutoMax);
    				}
    			}
    
    			try {
    				vertex.initializeOnMaster(classLoader);
    			}
    			catch (Throwable t) {
    					throw new JobExecutionException(jobId,
    							"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
    			}
    		}
    
    		log.info("Successfully ran initialization on master in {} ms.",
    				(System.nanoTime() - initMasterStart) / 1_000_000);
    
    		// topologically sort the job vertices and attach the graph to the existing one
    		List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
    		if (log.isDebugEnabled()) {
    			log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
    		}
    		executionGraph.attachJobGraph(sortedTopology);
    
    		if (log.isDebugEnabled()) {
    			log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);
    		}
    
    		// configure the state checkpointing
    		JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
    		if (snapshotSettings != null) {
    			List<ExecutionJobVertex> triggerVertices =
    					idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
    
    			List<ExecutionJobVertex> ackVertices =
    					idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
    
    			List<ExecutionJobVertex> confirmVertices =
    					idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
    
    			CompletedCheckpointStore completedCheckpoints;
    			CheckpointIDCounter checkpointIdCounter;
    			try {
    				int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
    						CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
    
    				if (maxNumberOfCheckpointsToRetain <= 0) {
    					// warning and use 1 as the default value if the setting in
    					// state.checkpoints.max-retained-checkpoints is not greater than 0.
    					log.warn("The setting for '{} : {}' is invalid. Using default value of {}",
    							CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
    							maxNumberOfCheckpointsToRetain,
    							CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
    
    					maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
    				}
    
    				completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
    				checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
    			}
    			catch (Exception e) {
    				throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);
    			}
    
    			// Maximum number of remembered checkpoints
    			int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
    
    			CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
    					historySize,
    					ackVertices,
    					snapshotSettings.getCheckpointCoordinatorConfiguration(),
    					metrics);
    
    			// The default directory for externalized checkpoints
    			String externalizedCheckpointsDir = jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
    
    			// load the state backend from the application settings
    			final StateBackend applicationConfiguredBackend;
    			final SerializedValue<StateBackend> serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
    
    			if (serializedAppConfigured == null) {
    				applicationConfiguredBackend = null;
    			}
    			else {
    				try {
    					applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader);
    				} catch (IOException | ClassNotFoundException e) {
    					throw new JobExecutionException(jobId,
    							"Could not deserialize application-defined state backend.", e);
    				}
    			}
    
    			final StateBackend rootBackend;
    			try {
    				rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(
    						applicationConfiguredBackend, jobManagerConfig, classLoader, log);
    			}
    			catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
    				throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
    			}
    
    			// instantiate the user-defined checkpoint hooks
    
    			final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = snapshotSettings.getMasterHooks();
    			final List<MasterTriggerRestoreHook<?>> hooks;
    
    			if (serializedHooks == null) {
    				hooks = Collections.emptyList();
    			}
    			else {
    				final MasterTriggerRestoreHook.Factory[] hookFactories;
    				try {
    					hookFactories = serializedHooks.deserializeValue(classLoader);
    				}
    				catch (IOException | ClassNotFoundException e) {
    					throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
    				}
    
    				final Thread thread = Thread.currentThread();
    				final ClassLoader originalClassLoader = thread.getContextClassLoader();
    				thread.setContextClassLoader(classLoader);
    
    				try {
    					hooks = new ArrayList<>(hookFactories.length);
    					for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
    						hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
    					}
    				}
    				finally {
    					thread.setContextClassLoader(originalClassLoader);
    				}
    			}
    
    			final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();
    
    			executionGraph.enableCheckpointing(
    				chkConfig.getCheckpointInterval(),
    				chkConfig.getCheckpointTimeout(),
    				chkConfig.getMinPauseBetweenCheckpoints(),
    				chkConfig.getMaxConcurrentCheckpoints(),
    				chkConfig.getCheckpointRetentionPolicy(),
    				triggerVertices,
    				ackVertices,
    				confirmVertices,
    				hooks,
    				checkpointIdCounter,
    				completedCheckpoints,
    				rootBackend,
    				checkpointStatsTracker);
    		}
    
    		// create all the metrics for the Execution Graph
    
    		metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
    		metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
    		metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
    		metrics.gauge(NumberOfFullRestartsGauge.METRIC_NAME, new NumberOfFullRestartsGauge(executionGraph));
    
    		executionGraph.getFailoverStrategy().registerMetrics(metrics);
    
    		return executionGraph;
    	}
    
    

    ExecutionGraph.attachJobGraph

    • 把JobVertex 转化为ExecutionJobVertex,调用new ExecutionJobVertex(),ExecutionJobVertex中存了inputSplits,所以会根据并行并来计算inputSplits的个数

      // create the execution job vertex and attach it to the graph
      		ExecutionJobVertex ejv = new ExecutionJobVertex(
      			this,
      			jobVertex,
      			1,
      			rpcTimeout,
      			globalModVersion,
      			createTimestamp);
      
    // --------------------------------------------------------------------------------------------
    	//  Actions
    	// --------------------------------------------------------------------------------------------
    
    	public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
    
    		LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
    				"vertices and {} intermediate results.",
    				topologiallySorted.size(), tasks.size(), intermediateResults.size());
    
    		final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
    		final long createTimestamp = System.currentTimeMillis();
    
    		for (JobVertex jobVertex : topologiallySorted) {
    
    			if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
    				this.isStoppable = false;
    			}
    
    			// create the execution job vertex and attach it to the graph
    			ExecutionJobVertex ejv = new ExecutionJobVertex(
    				this,
    				jobVertex,
    				1,
    				rpcTimeout,
    				globalModVersion,
    				createTimestamp);
    
    			ejv.connectToPredecessors(this.intermediateResults);
    
    			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
    			if (previousTask != null) {
    				throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
    						jobVertex.getID(), ejv, previousTask));
    			}
    
    			for (IntermediateResult res : ejv.getProducedDataSets()) {
    				IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
    				if (previousDataSet != null) {
    					throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
    							res.getId(), res, previousDataSet));
    				}
    			}
    
    			this.verticesInCreationOrder.add(ejv);
    			this.numVerticesTotal += ejv.getParallelism();
    			newExecJobVertices.add(ejv);
    		}
    
    		terminationFuture = new CompletableFuture<>();
    		failoverStrategy.notifyNewVertices(newExecJobVertices);
    	}
    

    ExecutionJobVerte

    • 调用FileInputFormat.createInputSplits(并行度)再实际处理

      	@SuppressWarnings("unchecked")
      		InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
      
      		if (splitSource != null) {
      			Thread currentThread = Thread.currentThread();
      			ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();
      			currentThread.setContextClassLoader(graph.getUserClassLoader());
      			try {
      				inputSplits = splitSource.createInputSplits(numTaskVertices);
      
      				if (inputSplits != null) {
      					splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
      				}
      			} finally {
      				currentThread.setContextClassLoader(oldContextClassLoader);
      			}
      		}
      
    	public ExecutionJobVertex(
    			ExecutionGraph graph,
    			JobVertex jobVertex,
    			int defaultParallelism,
    			Time timeout,
    			long initialGlobalModVersion,
    			long createTimestamp) throws JobException {
    
    		if (graph == null || jobVertex == null) {
    			throw new NullPointerException();
    		}
    
    		this.graph = graph;
    		this.jobVertex = jobVertex;
    
    		int vertexParallelism = jobVertex.getParallelism();
    		int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
    
    		final int configuredMaxParallelism = jobVertex.getMaxParallelism();
    
    		this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism);
    
    		// if no max parallelism was configured by the user, we calculate and set a default
    		setMaxParallelismInternal(maxParallelismConfigured ?
    				configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));
    
    		// verify that our parallelism is not higher than the maximum parallelism
    		if (numTaskVertices > maxParallelism) {
    			throw new JobException(
    				String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",
    					jobVertex.getName(),
    					numTaskVertices,
    					maxParallelism));
    		}
    
    		this.parallelism = numTaskVertices;
    
    		this.serializedTaskInformation = null;
    
    		this.taskVertices = new ExecutionVertex[numTaskVertices];
    		this.operatorIDs = Collections.unmodifiableList(jobVertex.getOperatorIDs());
    		this.userDefinedOperatorIds = Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs());
    
    		this.inputs = new ArrayList<>(jobVertex.getInputs().size());
    
    		// take the sharing group
    		this.slotSharingGroup = jobVertex.getSlotSharingGroup();
    		this.coLocationGroup = jobVertex.getCoLocationGroup();
    
    		// setup the coLocation group
    		if (coLocationGroup != null && slotSharingGroup == null) {
    			throw new JobException("Vertex uses a co-location constraint without using slot sharing");
    		}
    
    		// create the intermediate results
    		this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
    
    		for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
    			final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
    
    			this.producedDataSets[i] = new IntermediateResult(
    					result.getId(),
    					this,
    					numTaskVertices,
    					result.getResultType());
    		}
    
    		Configuration jobConfiguration = graph.getJobConfiguration();
    		int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
    				jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
    				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();
    
    		// create all task vertices
    		for (int i = 0; i < numTaskVertices; i++) {
    			ExecutionVertex vertex = new ExecutionVertex(
    					this,
    					i,
    					producedDataSets,
    					timeout,
    					initialGlobalModVersion,
    					createTimestamp,
    					maxPriorAttemptsHistoryLength);
    
    			this.taskVertices[i] = vertex;
    		}
    
    		// sanity check for the double referencing between intermediate result partitions and execution vertices
    		for (IntermediateResult ir : this.producedDataSets) {
    			if (ir.getNumberOfAssignedPartitions() != parallelism) {
    				throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
    			}
    		}
    
    		// set up the input splits, if the vertex has any
    		try {
    			@SuppressWarnings("unchecked")
    			InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
    
    			if (splitSource != null) {
    				Thread currentThread = Thread.currentThread();
    				ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();
    				currentThread.setContextClassLoader(graph.getUserClassLoader());
    				try {
    					inputSplits = splitSource.createInputSplits(numTaskVertices);
    
    					if (inputSplits != null) {
    						splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
    					}
    				} finally {
    					currentThread.setContextClassLoader(oldContextClassLoader);
    				}
    			}
    			else {
    				inputSplits = null;
    			}
    		}
    		catch (Throwable t) {
    			throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
    		}
    	}
    
    

    FileInputFormat.createInputSplits

    • 真正的方法在这里,根据并行度,把文件拆分成FileInputSplit[]

    • 首先遍历路径是文件或目录,计算出所有文件放到List files = new ArrayList<>()中存储,计算出所有文件总大小totalLength,计算文件切片,当然是所有文件总大小来计算

      // get all the files that are involved in the splits
      	List<FileStatus> files = new ArrayList<>();
      	long totalLength = 0;
      
      	for (Path path : getFilePaths()) {
      		final FileSystem fs = path.getFileSystem();
      		final FileStatus pathFile = fs.getFileStatus(path);
      
      		if (pathFile.isDir()) {
      			totalLength += addFilesInDir(path, files, true);
      		} else {
      			testForUnsplittable(pathFile);
      
      			files.add(pathFile);
      			totalLength += pathFile.getLen();
      		}
      	}
      
    • 每个切片最大长度计算,totalLength = 9 为文件总长度,minNumSplits = 2 为并行度,也就是9不能整除并行度2,说明有余数,如果把余数的数据单独在分配一个切片,有可能这一个切片的数据量很少,就浪费资源了,这里的做法是,余数的最大值,也就是每个切片+1,就把这里多的余数分配到前面的每个切片中,也就是每个切片的最大值为 9 / 2 + 1 = 5

    final long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1);
    ```

    • 计算实际切片的大小,blockSize 此处为文件大小,maxSplitSize 一般都小于blockSize,所以最后取的是切片的最大长度maxSplitSize

      final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
      
    • 实际计算时,当计算最后一个切片时,如果剩下的数据大小小于 切片大小的1.1倍,就放在一个切片中,不在切分了,直接把剩下的数据放到最后一个切片中,因为如果切后后,导致最后一切片数据量很小,浪费资源

      final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);
      
    • 切片拆分的计算方法,初使值 bytesUnassigned = len(文件总数据长度),每分一次bytesUnassigned会减去当前切片的大小,也就是bytesUnassigned每次都是还剩下总的数据大小,当bytesUnassigned > maxBytesForLastSplit 就一直循环拆分切片,切片的长度为splitSize(切片大小) = 5, 开始位置从0开始,以后每个切片开始位置都需要加上之前所有切片大小 position += splitSize ;

      			while (bytesUnassigned > maxBytesForLastSplit) {
      				// get the block containing the majority of the data
      				blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
      				// create a new split
      				FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, splitSize,
      					blocks[blockIndex].getHosts());
      				inputSplits.add(fis);
      
      				// adjust the positions
      				position += splitSize;
      				bytesUnassigned -= splitSize;
      			}
      
    • 由于while循环拆分切片是有条件的,bytesUnassigned > maxBytesForLastSplit,那如果bytesUnassigned <= maxBytesForLastSplit,就需要把剩下的数据,都放到最后一个切片中

      	// assign the last split
      			if (bytesUnassigned > 0) {
      				blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
      				final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position,
      					bytesUnassigned, blocks[blockIndex].getHosts());
      				inputSplits.add(fis);
      			}
      
    /**
    	 * Computes the input splits for the file. By default, one file block is one split. If more splits
    	 * are requested than blocks are available, then a split may be a fraction of a block and splits may cross
    	 * block boundaries.
    	 * 
    	 * @param minNumSplits The minimum desired number of file splits.
    	 * @return The computed file splits.
    	 * 
    	 * @see org.apache.flink.api.common.io.InputFormat#createInputSplits(int)
    	 */
    	@Override
    	public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
    		if (minNumSplits < 1) {
    			throw new IllegalArgumentException("Number of input splits has to be at least 1.");
    		}
    		
    		// take the desired number of splits into account
    		minNumSplits = Math.max(minNumSplits, this.numSplits);
    		
    		final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(minNumSplits);
    
    		// get all the files that are involved in the splits
    		List<FileStatus> files = new ArrayList<>();
    		long totalLength = 0;
    
    		for (Path path : getFilePaths()) {
    			final FileSystem fs = path.getFileSystem();
    			final FileStatus pathFile = fs.getFileStatus(path);
    
    			if (pathFile.isDir()) {
    				totalLength += addFilesInDir(path, files, true);
    			} else {
    				testForUnsplittable(pathFile);
    
    				files.add(pathFile);
    				totalLength += pathFile.getLen();
    			}
    		}
    
    		// returns if unsplittable
    		if (unsplittable) {
    			int splitNum = 0;
    			for (final FileStatus file : files) {
    				final FileSystem fs = file.getPath().getFileSystem();
    				final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());
    				Set<String> hosts = new HashSet<String>();
    				for(BlockLocation block : blocks) {
    					hosts.addAll(Arrays.asList(block.getHosts()));
    				}
    				long len = file.getLen();
    				if(testForUnsplittable(file)) {
    					len = READ_WHOLE_SPLIT_FLAG;
    				}
    				FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, len,
    						hosts.toArray(new String[hosts.size()]));
    				inputSplits.add(fis);
    			}
    			return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
    		}
    		
    
    		final long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1);
    
    		// now that we have the files, generate the splits
    		int splitNum = 0;
    		for (final FileStatus file : files) {
    
    			final FileSystem fs = file.getPath().getFileSystem();
    			final long len = file.getLen();
    			final long blockSize = file.getBlockSize();
    			
    			final long minSplitSize;
    			if (this.minSplitSize <= blockSize) {
    				minSplitSize = this.minSplitSize;
    			}
    			else {
    				if (LOG.isWarnEnabled()) {
    					LOG.warn("Minimal split size of " + this.minSplitSize + " is larger than the block size of " + 
    						blockSize + ". Decreasing minimal split size to block size.");
    				}
    				minSplitSize = blockSize;
    			}
    
    			final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
    			final long halfSplit = splitSize >>> 1;
    
    			final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);
    
    			if (len > 0) {
    
    				// get the block locations and make sure they are in order with respect to their offset
    				final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
    				Arrays.sort(blocks);
    
    				long bytesUnassigned = len;
    				long position = 0;
    
    				int blockIndex = 0;
    
    				while (bytesUnassigned > maxBytesForLastSplit) {
    					// get the block containing the majority of the data
    					blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
    					// create a new split
    					FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, splitSize,
    						blocks[blockIndex].getHosts());
    					inputSplits.add(fis);
    
    					// adjust the positions
    					position += splitSize;
    					bytesUnassigned -= splitSize;
    				}
    
    				// assign the last split
    				if (bytesUnassigned > 0) {
    					blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
    					final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position,
    						bytesUnassigned, blocks[blockIndex].getHosts());
    					inputSplits.add(fis);
    				}
    			} else {
    				// special case with a file of zero bytes size
    				final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0);
    				String[] hosts;
    				if (blocks.length > 0) {
    					hosts = blocks[0].getHosts();
    				} else {
    					hosts = new String[0];
    				}
    				final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, 0, hosts);
    				inputSplits.add(fis);
    			}
    		}
    
    		return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
    	}
    
    

    源码分析(切分文件实际读取)(切片一)

    DataSourceTask

    DataSourceTask.invoke()

    • Source 的操作链(ChainedFlatMapDriver,ChainedMapDriver,SynchronousChainedCombineDriver) 即 FlatMap -> Map -> Combine (SUM(1)),也就是source读到的数据,都需要经过链上的算子操作

    // start all chained tasks
    BatchTask.openChainedTasks(this.chainedTasks, this);
    ```

    ```
    this.chainedTasks = {ArrayList@5459}  size = 3
    

    0 = {ChainedFlatMapDriver@5458}
    1 = {ChainedMapDriver@5505}
    2 = {SynchronousChainedCombineDriver@5506}
    ```

    • 随机读到一个切片,给当前DataSourceTask使用,因为在Source读取数据时是不按key分区,也就不分谁处理,有任务来处理,就给一个切片处理就行,每给出一个从总的切片中移除

      final InputSplit split = splitIterator.next();
      
    • 当前切片信息

          LOG.debug(getLogString("Opening input split " + split.toString()));
          
          
          
          
          13:04:02,082 DEBUG [CHAIN DataSource (at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:19) (org.apache.flink.api.java.io.TextInp) -> FlatMap (FlatMap at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Map (Map at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Combine (SUM(1)) (2/2)] org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:165)      - Opening input split [1] file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/src/main/resources/data/line.txt:5+4:  CHAIN DataSource (at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:19) (org.apache.flink.api.java.io.TextInp) -> FlatMap (FlatMap at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Map (Map at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Combine (SUM(1)) (2/2)
      
    • 对当前切片进行处理 ,调用 DelimitedInputFormat.open(),//open还没开始真正的读数据,只是定位,把第一个换行符,分到前一个分片,自己从第二个换行符开始读取数据

      format.open(split);
      
    @Override
    	public void invoke() throws Exception {
    		// --------------------------------------------------------------------
    		// Initialize
    		// --------------------------------------------------------------------
    		initInputFormat();
    
    		LOG.debug(getLogString("Start registering input and output"));
    
    		try {
    			initOutputs(getUserCodeClassLoader());
    		} catch (Exception ex) {
    			throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
    					ex.getMessage(), ex);
    		}
    
    		LOG.debug(getLogString("Finished registering input and output"));
    
    		// --------------------------------------------------------------------
    		// Invoke
    		// --------------------------------------------------------------------
    		LOG.debug(getLogString("Starting data source operator"));
    
    		RuntimeContext ctx = createRuntimeContext();
    
    		final Counter numRecordsOut;
    		{
    			Counter tmpNumRecordsOut;
    			try {
    				OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
    				ioMetricGroup.reuseInputMetricsForTask();
    				if (this.config.getNumberOfChainedStubs() == 0) {
    					ioMetricGroup.reuseOutputMetricsForTask();
    				}
    				tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
    			} catch (Exception e) {
    				LOG.warn("An exception occurred during the metrics setup.", e);
    				tmpNumRecordsOut = new SimpleCounter();
    			}
    			numRecordsOut = tmpNumRecordsOut;
    		}
    		
    		Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
    
    		if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
    			((RichInputFormat) this.format).setRuntimeContext(ctx);
    			LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
    			((RichInputFormat) this.format).openInputFormat();
    			LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
    		}
    
    		ExecutionConfig executionConfig = getExecutionConfig();
    
    		boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
    
    		LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
    		
    		final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
    		
    		try {
    			// start all chained tasks
    			BatchTask.openChainedTasks(this.chainedTasks, this);
    			
    			// get input splits to read
    			final Iterator<InputSplit> splitIterator = getInputSplits();
    			
    			// for each assigned input split
    			while (!this.taskCanceled && splitIterator.hasNext())
    			{
    				// get start and end
    				final InputSplit split = splitIterator.next();
    
    				LOG.debug(getLogString("Opening input split " + split.toString()));
    				
    				final InputFormat<OT, InputSplit> format = this.format;
    			
    				// open input format
    				format.open(split);
    	
    				LOG.debug(getLogString("Starting to read input from split " + split.toString()));
    				
    				try {
    					final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);
    
    					if (objectReuseEnabled) {
    						OT reuse = serializer.createInstance();
    
    						// as long as there is data to read
    						while (!this.taskCanceled && !format.reachedEnd()) {
    
    							OT returned;
    							if ((returned = format.nextRecord(reuse)) != null) {
    								output.collect(returned);
    							}
    						}
    					} else {
    						// as long as there is data to read
    						while (!this.taskCanceled && !format.reachedEnd()) {
    							OT returned;
    							if ((returned = format.nextRecord(serializer.createInstance())) != null) {
    								output.collect(returned);
    							}
    						}
    					}
    
    					if (LOG.isDebugEnabled() && !this.taskCanceled) {
    						LOG.debug(getLogString("Closing input split " + split.toString()));
    					}
    				} finally {
    					// close. We close here such that a regular close throwing an exception marks a task as failed.
    					format.close();
    				}
    				completedSplitsCounter.inc();
    			} // end for all input splits
    
    			// close the collector. if it is a chaining task collector, it will close its chained tasks
    			this.output.close();
    
    			// close all chained tasks letting them report failure
    			BatchTask.closeChainedTasks(this.chainedTasks, this);
    
    		}
    		catch (Exception ex) {
    			// close the input, but do not report any exceptions, since we already have another root cause
    			try {
    				this.format.close();
    			} catch (Throwable ignored) {}
    
    			BatchTask.cancelChainedTasks(this.chainedTasks);
    
    			ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
    
    			if (ex instanceof CancelTaskException) {
    				// forward canceling exception
    				throw ex;
    			}
    			else if (!this.taskCanceled) {
    				// drop exception, if the task was canceled
    				BatchTask.logAndThrowException(ex, this);
    			}
    		} finally {
    			BatchTask.clearWriters(eventualOutputs);
    			// --------------------------------------------------------------------
    			// Closing
    			// --------------------------------------------------------------------
    			if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
    				((RichInputFormat) this.format).closeInputFormat();
    				LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
    			}
    		}
    
    		if (!this.taskCanceled) {
    			LOG.debug(getLogString("Finished data source operator"));
    		}
    		else {
    			LOG.debug(getLogString("Data source operator cancelled"));
    		}
    	}
    
    

    DelimitedInputFormat.open()

    • 调用FileInputFormat.open(split),设置当前切片信息(切片的开始位置,切片长度),和定位开始位置
    • initBuffers();// 初使化Buffers信息,默认的readBuffer大小为1M,wrapBuffer 为256 byte
    • 调用 DelimitedInputFormat.readLine()
    /**
    	 * Opens the given input split. This method opens the input stream to the specified file, allocates read buffers
    	 * and positions the stream at the correct position, making sure that any partial record at the beginning is skipped.
    	 *
    	 * @param split The input split to open.
    	 *
    	 * @see org.apache.flink.api.common.io.FileInputFormat#open(org.apache.flink.core.fs.FileInputSplit)
    	 */
    	@Override
    	public void open(FileInputSplit split) throws IOException {
    		super.open(split);
    		initBuffers();
    
    		this.offset = splitStart;
    		if (this.splitStart != 0) {
    			this.stream.seek(offset);
    			readLine();
    			// if the first partial record already pushes the stream over
    			// the limit of our split, then no record starts within this split
    			if (this.overLimit) {
    				this.end = true;
    			}
    		} else {
    			fillBuffer(0);
    		}
    	}
    

    FileInputFormat.open(split)

    • 调置当前分片

      this.currentSplit = fileSplit;
      
    • 调置当前分片的开始位置

      this.splitStart = fileSplit.getStart();
      
    • 调置当前分片的长度

      this.splitLength = fileSplit.getLength();
      
    • 流定位到开始位置

      	// get FSDataInputStream
      		if (this.splitStart != 0) {
      			this.stream.seek(this.splitStart);
      		}
      
    /**
    	 * Opens an input stream to the file defined in the input format.
    	 * The stream is positioned at the beginning of the given split.
    	 * <p>
    	 * The stream is actually opened in an asynchronous thread to make sure any interruptions to the thread 
    	 * working on the input format do not reach the file system.
    	 */
    	@Override
    	public void open(FileInputSplit fileSplit) throws IOException {
    
    		this.currentSplit = fileSplit;
    		this.splitStart = fileSplit.getStart();
    		this.splitLength = fileSplit.getLength();
    
    		if (LOG.isDebugEnabled()) {
    			LOG.debug("Opening input split " + fileSplit.getPath() + " [" + this.splitStart + "," + this.splitLength + "]");
    		}
    
    		
    		// open the split in an asynchronous thread
    		final InputSplitOpenThread isot = new InputSplitOpenThread(fileSplit, this.openTimeout);
    		isot.start();
    		
    		try {
    			this.stream = isot.waitForCompletion();
    			this.stream = decorateInputStream(this.stream, fileSplit);
    		}
    		catch (Throwable t) {
    			throw new IOException("Error opening the Input Split " + fileSplit.getPath() + 
    					" [" + splitStart + "," + splitLength + "]: " + t.getMessage(), t);
    		}
    		
    		// get FSDataInputStream
    		if (this.splitStart != 0) {
    			this.stream.seek(this.splitStart);
    		}
    	}
    
    

    DelimitedInputFormat.initBuffers

    • 默认的readBuffer大小为1M,wrapBuffer 为256 byte

    • 当前切片默认值设置

      	this.readPos = 0;
      		this.limit = 0;
      		this.overLimit = false;
      		this.end = false;
      
    private void initBuffers() {
    		this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
    
    		if (this.bufferSize <= this.delimiter.length) {
    			throw new IllegalArgumentException("Buffer size must be greater than length of delimiter.");
    		}
    
    		if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
    			this.readBuffer = new byte[this.bufferSize];
    		}
    		if (this.wrapBuffer == null || this.wrapBuffer.length < 256) {
    			this.wrapBuffer = new byte[256];
    		}
    
    		this.readPos = 0;
    		this.limit = 0;
    		this.overLimit = false;
    		this.end = false;
    	}
    

    DelimitedInputFormat.readLine()

    • (读取数据到缓存中)调用 DelimitedInputFormat.fillBuffer(),读到数据到缓存中,如果数据大于1m就读1m的数据,如果小于1m,就把当前切片的数据全部读完
    • 读取一行数据,也就是读到第一个换行符
      			// Search for next occurrence of delimiter in read buffer.
      			while (this.readPos < this.limit && delimPos < this.delimiter.length) {
      				if ((this.readBuffer[this.readPos]) == this.delimiter[delimPos]) {
      					// Found the expected delimiter character. Continue looking for the next character of delimiter.
      					delimPos++;
      				} else {
      					// Delimiter does not match.
      					// We have to reset the read position to the character after the first matching character
      					//   and search for the whole delimiter again.
      					readPos -= delimPos;
      					delimPos = 0;
      				}
      				readPos++;
      			}
      
    • 第一次,startPos =0 ,count = 0,没读到数据
      setResult(this.readBuffer, startPos, count);
      
    protected final boolean readLine() throws IOException {
    		if (this.stream == null || this.overLimit) {
    			return false;
    		}
    
    		int countInWrapBuffer = 0;
    
    		// position of matching positions in the delimiter byte array
    		int delimPos = 0;
    
    		while (true) {
    			if (this.readPos >= this.limit) {
    				// readBuffer is completely consumed. Fill it again but keep partially read delimiter bytes.
    				if (!fillBuffer(delimPos)) {
    					int countInReadBuffer = delimPos;
    					if (countInWrapBuffer + countInReadBuffer > 0) {
    						// we have bytes left to emit
    						if (countInReadBuffer > 0) {
    							// we have bytes left in the readBuffer. Move them into the wrapBuffer
    							if (this.wrapBuffer.length - countInWrapBuffer < countInReadBuffer) {
    								// reallocate
    								byte[] tmp = new byte[countInWrapBuffer + countInReadBuffer];
    								System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
    								this.wrapBuffer = tmp;
    							}
    
    							// copy readBuffer bytes to wrapBuffer
    							System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, countInReadBuffer);
    							countInWrapBuffer += countInReadBuffer;
    						}
    
    						this.offset += countInWrapBuffer;
    						setResult(this.wrapBuffer, 0, countInWrapBuffer);
    						return true;
    					} else {
    						return false;
    					}
    				}
    			}
    
    			int startPos = this.readPos - delimPos;
    			int count;
    
    			// Search for next occurrence of delimiter in read buffer.
    			while (this.readPos < this.limit && delimPos < this.delimiter.length) {
    				if ((this.readBuffer[this.readPos]) == this.delimiter[delimPos]) {
    					// Found the expected delimiter character. Continue looking for the next character of delimiter.
    					delimPos++;
    				} else {
    					// Delimiter does not match.
    					// We have to reset the read position to the character after the first matching character
    					//   and search for the whole delimiter again.
    					readPos -= delimPos;
    					delimPos = 0;
    				}
    				readPos++;
    			}
    
    			// check why we dropped out
    			if (delimPos == this.delimiter.length) {
    				// we found a delimiter
    				int readBufferBytesRead = this.readPos - startPos;
    				this.offset += countInWrapBuffer + readBufferBytesRead;
    				count = readBufferBytesRead - this.delimiter.length;
    
    				// copy to byte array
    				if (countInWrapBuffer > 0) {
    					// check wrap buffer size
    					if (this.wrapBuffer.length < countInWrapBuffer + count) {
    						final byte[] nb = new byte[countInWrapBuffer + count];
    						System.arraycopy(this.wrapBuffer, 0, nb, 0, countInWrapBuffer);
    						this.wrapBuffer = nb;
    					}
    					if (count >= 0) {
    						System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, count);
    					}
    					setResult(this.wrapBuffer, 0, countInWrapBuffer + count);
    					return true;
    				} else {
    					setResult(this.readBuffer, startPos, count);
    					return true;
    				}
    			} else {
    				// we reached the end of the readBuffer
    				count = this.limit - startPos;
    				
    				// check against the maximum record length
    				if (((long) countInWrapBuffer) + count > this.lineLengthLimit) {
    					throw new IOException("The record length exceeded the maximum record length (" + 
    							this.lineLengthLimit + ").");
    				}
    
    				// Compute number of bytes to move to wrapBuffer
    				// Chars of partially read delimiter must remain in the readBuffer. We might need to go back.
    				int bytesToMove = count - delimPos;
    				// ensure wrapBuffer is large enough
    				if (this.wrapBuffer.length - countInWrapBuffer < bytesToMove) {
    					// reallocate
    					byte[] tmp = new byte[Math.max(this.wrapBuffer.length * 2, countInWrapBuffer + bytesToMove)];
    					System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
    					this.wrapBuffer = tmp;
    				}
    
    				// copy readBuffer to wrapBuffer (except delimiter chars)
    				System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, bytesToMove);
    				countInWrapBuffer += bytesToMove;
    				// move delimiter chars to the beginning of the readBuffer
    				System.arraycopy(this.readBuffer, this.readPos - delimPos, this.readBuffer, 0, delimPos);
    
    			}
    		}
    	}
    

    DelimitedInputFormat.fillBuffer()

    • 读到数据到缓存中,如果数据大于1m就读1m的数据,如果小于1m,就把当前切片的数据全部读完
    0 = 10
    1 = 98
    2 = 32
    3 = 99
    
    /**
    	 * Fills the read buffer with bytes read from the file starting from an offset.
    	 */
    	private boolean fillBuffer(int offset) throws IOException {
    		int maxReadLength = this.readBuffer.length - offset;
    		// special case for reading the whole split.
    		if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
    			int read = this.stream.read(this.readBuffer, offset, maxReadLength);
    			if (read == -1) {
    				this.stream.close();
    				this.stream = null;
    				return false;
    			} else {
    				this.readPos = offset;
    				this.limit = read;
    				return true;
    			}
    		}
    		
    		// else ..
    		int toRead;
    		if (this.splitLength > 0) {
    			// if we have more data, read that
    			toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength;
    		}
    		else {
    			// if we have exhausted our split, we need to complete the current record, or read one
    			// more across the next split.
    			// the reason is that the next split will skip over the beginning until it finds the first
    			// delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
    			// previous split.
    			toRead = maxReadLength;
    			this.overLimit = true;
    		}
    
    		int read = this.stream.read(this.readBuffer, offset, toRead);
    
    		if (read == -1) {
    			this.stream.close();
    			this.stream = null;
    			return false;
    		} else {
    			this.splitLength -= read;
    			this.readPos = offset; // position from where to start reading
    			this.limit = read + offset; // number of valid bytes in the read buffer
    			return true;
    		}
    	}
    

    DelimitedInputFormat.nextRecord

    • 读取一行数据,从当前分片,DelimitedInputFormat.open()已重新计算当前切片的开始位置
    • 调用DelimitedInputFormat.readLine() 读取当前切片的一行数据
    public OT nextRecord(OT record) throws IOException {
    		if (readLine()) {
    			return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    		} else {
    			this.end = true;
    			return null;
    		}
    	}
    

    DelimitedInputFormat.readLine()

    • DelimitedInputFormat.open()已重新计算当前切片的开始位置,但是切片的长度不变,还是读取以前计算的长度
    • 读到第一个换行符的数据,即读一行数据,如果没有换行符,当读取到当前切片最大长度
    			// Search for next occurrence of delimiter in read buffer.
    			while (this.readPos < this.limit && delimPos < this.delimiter.length) {
    				if ((this.readBuffer[this.readPos]) == this.delimiter[delimPos]) {
    					// Found the expected delimiter character. Continue looking for the next character of delimiter.
    					delimPos++;
    				} else {
    					// Delimiter does not match.
    					// We have to reset the read position to the character after the first matching character
    					//   and search for the whole delimiter again.
    					readPos -= delimPos;
    					delimPos = 0;
    				}
    				readPos++;
    			}
    
    • 从缓存区readBuffer复制当前行数据到 wrapBuffer

      System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, bytesToMove);
      
    • 如果有换行符,需要删除换行符,在readBuffer

      // move delimiter chars to the beginning of the readBuffer
      				System.arraycopy(this.readBuffer, this.readPos - delimPos, this.readBuffer, 0, delimPos);
      
    • 当前切片需要读取的个数this.limit,读取完后,继续读一个1m的数据到缓存中,最后将当前行数据返回setResult

    • fillBuffer函数中,如果当前切处数据读完了,会设置overLimit = true,读下一行数据时就不满足条件就不会读了

    if (this.readPos >= this.limit) {
    				// readBuffer is completely consumed. Fill it again but keep partially read delimiter bytes.
    				if (!fillBuffer(delimPos)) {
    					int countInReadBuffer = delimPos;
    					if (countInWrapBuffer + countInReadBuffer > 0) {
    						// we have bytes left to emit
    						if (countInReadBuffer > 0) {
    							// we have bytes left in the readBuffer. Move them into the wrapBuffer
    							if (this.wrapBuffer.length - countInWrapBuffer < countInReadBuffer) {
    								// reallocate
    								byte[] tmp = new byte[countInWrapBuffer + countInReadBuffer];
    								System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
    								this.wrapBuffer = tmp;
    							}
    
    							// copy readBuffer bytes to wrapBuffer
    							System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, countInReadBuffer);
    							countInWrapBuffer += countInReadBuffer;
    						}
    
    						this.offset += countInWrapBuffer;
    						setResult(this.wrapBuffer, 0, countInWrapBuffer);
    						return true;
    					} else {
    						return false;
    					}
    				}
    			}
    
    
    • 第一次读到的数据为
    b c
    
    protected final boolean readLine() throws IOException {
    		if (this.stream == null || this.overLimit) {
    			return false;
    		}
    
    		int countInWrapBuffer = 0;
    
    		// position of matching positions in the delimiter byte array
    		int delimPos = 0;
    
    		while (true) {
    			if (this.readPos >= this.limit) {
    				// readBuffer is completely consumed. Fill it again but keep partially read delimiter bytes.
    				if (!fillBuffer(delimPos)) {
    					int countInReadBuffer = delimPos;
    					if (countInWrapBuffer + countInReadBuffer > 0) {
    						// we have bytes left to emit
    						if (countInReadBuffer > 0) {
    							// we have bytes left in the readBuffer. Move them into the wrapBuffer
    							if (this.wrapBuffer.length - countInWrapBuffer < countInReadBuffer) {
    								// reallocate
    								byte[] tmp = new byte[countInWrapBuffer + countInReadBuffer];
    								System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
    								this.wrapBuffer = tmp;
    							}
    
    							// copy readBuffer bytes to wrapBuffer
    							System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, countInReadBuffer);
    							countInWrapBuffer += countInReadBuffer;
    						}
    
    						this.offset += countInWrapBuffer;
    						setResult(this.wrapBuffer, 0, countInWrapBuffer);
    						return true;
    					} else {
    						return false;
    					}
    				}
    			}
    
    			int startPos = this.readPos - delimPos;
    			int count;
    
    			// Search for next occurrence of delimiter in read buffer.
    			while (this.readPos < this.limit && delimPos < this.delimiter.length) {
    				if ((this.readBuffer[this.readPos]) == this.delimiter[delimPos]) {
    					// Found the expected delimiter character. Continue looking for the next character of delimiter.
    					delimPos++;
    				} else {
    					// Delimiter does not match.
    					// We have to reset the read position to the character after the first matching character
    					//   and search for the whole delimiter again.
    					readPos -= delimPos;
    					delimPos = 0;
    				}
    				readPos++;
    			}
    
    			// check why we dropped out
    			if (delimPos == this.delimiter.length) {
    				// we found a delimiter
    				int readBufferBytesRead = this.readPos - startPos;
    				this.offset += countInWrapBuffer + readBufferBytesRead;
    				count = readBufferBytesRead - this.delimiter.length;
    
    				// copy to byte array
    				if (countInWrapBuffer > 0) {
    					// check wrap buffer size
    					if (this.wrapBuffer.length < countInWrapBuffer + count) {
    						final byte[] nb = new byte[countInWrapBuffer + count];
    						System.arraycopy(this.wrapBuffer, 0, nb, 0, countInWrapBuffer);
    						this.wrapBuffer = nb;
    					}
    					if (count >= 0) {
    						System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, count);
    					}
    					setResult(this.wrapBuffer, 0, countInWrapBuffer + count);
    					return true;
    				} else {
    					setResult(this.readBuffer, startPos, count);
    					return true;
    				}
    			} else {
    				// we reached the end of the readBuffer
    				count = this.limit - startPos;
    				
    				// check against the maximum record length
    				if (((long) countInWrapBuffer) + count > this.lineLengthLimit) {
    					throw new IOException("The record length exceeded the maximum record length (" + 
    							this.lineLengthLimit + ").");
    				}
    
    				// Compute number of bytes to move to wrapBuffer
    				// Chars of partially read delimiter must remain in the readBuffer. We might need to go back.
    				int bytesToMove = count - delimPos;
    				// ensure wrapBuffer is large enough
    				if (this.wrapBuffer.length - countInWrapBuffer < bytesToMove) {
    					// reallocate
    					byte[] tmp = new byte[Math.max(this.wrapBuffer.length * 2, countInWrapBuffer + bytesToMove)];
    					System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
    					this.wrapBuffer = tmp;
    				}
    
    				// copy readBuffer to wrapBuffer (except delimiter chars)
    				System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, bytesToMove);
    				countInWrapBuffer += bytesToMove;
    				// move delimiter chars to the beginning of the readBuffer
    				System.arraycopy(this.readBuffer, this.readPos - delimPos, this.readBuffer, 0, delimPos);
    
    			}
    		}
    	}
    

    源码分析(切分文件实际读取)(切片二)

    DataSourceTask

    DataSourceTask.invoke()

    • Source 的操作链(ChainedFlatMapDriver,ChainedMapDriver,SynchronousChainedCombineDriver) 即 FlatMap -> Map -> Combine (SUM(1)),也就是source读到的数据,都需要经过链上的算子操作

    // start all chained tasks
    BatchTask.openChainedTasks(this.chainedTasks, this);
    ```

    ```
    this.chainedTasks = {ArrayList@5459}  size = 3
    

    0 = {ChainedFlatMapDriver@5458}
    1 = {ChainedMapDriver@5505}
    2 = {SynchronousChainedCombineDriver@5506}
    ```

    • 随机读到一个切片,给当前DataSourceTask使用,因为在Source读取数据时是不按key分区,也就不分谁处理,有任务来处理,就给一个切片处理就行,每给出一个从总的切片中移除

      final InputSplit split = splitIterator.next();
      
    • 当前切片信息

          LOG.debug(getLogString("Opening input split " + split.toString()));
          
          
          
          
         15:12:01,928 DEBUG [CHAIN DataSource (at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:19) (org.apache.flink.api.java.io.TextInp) -> FlatMap (FlatMap at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Map (Map at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Combine (SUM(1)) (2/2)] org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:172)      - Starting to read input from split [0] file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/src/main/resources/data/line.txt:0+5:  CHAIN DataSource (at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:19) (org.apache.flink.api.java.io.TextInp) -> FlatMap (FlatMap at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Map (Map at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Combine (SUM(1)) (2/2)
      
      
    • 对当前切片进行处理 ,调用 DelimitedInputFormat.open(),//open还没开始真正的读数据,只是定位,把第一个换行符,分到前一个分片,自己从第二个换行符开始读取数据

      format.open(split);
      
    @Override
    	public void invoke() throws Exception {
    		// --------------------------------------------------------------------
    		// Initialize
    		// --------------------------------------------------------------------
    		initInputFormat();
    
    		LOG.debug(getLogString("Start registering input and output"));
    
    		try {
    			initOutputs(getUserCodeClassLoader());
    		} catch (Exception ex) {
    			throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
    					ex.getMessage(), ex);
    		}
    
    		LOG.debug(getLogString("Finished registering input and output"));
    
    		// --------------------------------------------------------------------
    		// Invoke
    		// --------------------------------------------------------------------
    		LOG.debug(getLogString("Starting data source operator"));
    
    		RuntimeContext ctx = createRuntimeContext();
    
    		final Counter numRecordsOut;
    		{
    			Counter tmpNumRecordsOut;
    			try {
    				OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
    				ioMetricGroup.reuseInputMetricsForTask();
    				if (this.config.getNumberOfChainedStubs() == 0) {
    					ioMetricGroup.reuseOutputMetricsForTask();
    				}
    				tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
    			} catch (Exception e) {
    				LOG.warn("An exception occurred during the metrics setup.", e);
    				tmpNumRecordsOut = new SimpleCounter();
    			}
    			numRecordsOut = tmpNumRecordsOut;
    		}
    		
    		Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
    
    		if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
    			((RichInputFormat) this.format).setRuntimeContext(ctx);
    			LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
    			((RichInputFormat) this.format).openInputFormat();
    			LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
    		}
    
    		ExecutionConfig executionConfig = getExecutionConfig();
    
    		boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
    
    		LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
    		
    		final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
    		
    		try {
    			// start all chained tasks
    			BatchTask.openChainedTasks(this.chainedTasks, this);
    			
    			// get input splits to read
    			final Iterator<InputSplit> splitIterator = getInputSplits();
    			
    			// for each assigned input split
    			while (!this.taskCanceled && splitIterator.hasNext())
    			{
    				// get start and end
    				final InputSplit split = splitIterator.next();
    
    				LOG.debug(getLogString("Opening input split " + split.toString()));
    				
    				final InputFormat<OT, InputSplit> format = this.format;
    			
    				// open input format
    				format.open(split);
    	
    				LOG.debug(getLogString("Starting to read input from split " + split.toString()));
    				
    				try {
    					final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);
    
    					if (objectReuseEnabled) {
    						OT reuse = serializer.createInstance();
    
    						// as long as there is data to read
    						while (!this.taskCanceled && !format.reachedEnd()) {
    
    							OT returned;
    							if ((returned = format.nextRecord(reuse)) != null) {
    								output.collect(returned);
    							}
    						}
    					} else {
    						// as long as there is data to read
    						while (!this.taskCanceled && !format.reachedEnd()) {
    							OT returned;
    							if ((returned = format.nextRecord(serializer.createInstance())) != null) {
    								output.collect(returned);
    							}
    						}
    					}
    
    					if (LOG.isDebugEnabled() && !this.taskCanceled) {
    						LOG.debug(getLogString("Closing input split " + split.toString()));
    					}
    				} finally {
    					// close. We close here such that a regular close throwing an exception marks a task as failed.
    					format.close();
    				}
    				completedSplitsCounter.inc();
    			} // end for all input splits
    
    			// close the collector. if it is a chaining task collector, it will close its chained tasks
    			this.output.close();
    
    			// close all chained tasks letting them report failure
    			BatchTask.closeChainedTasks(this.chainedTasks, this);
    
    		}
    		catch (Exception ex) {
    			// close the input, but do not report any exceptions, since we already have another root cause
    			try {
    				this.format.close();
    			} catch (Throwable ignored) {}
    
    			BatchTask.cancelChainedTasks(this.chainedTasks);
    
    			ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
    
    			if (ex instanceof CancelTaskException) {
    				// forward canceling exception
    				throw ex;
    			}
    			else if (!this.taskCanceled) {
    				// drop exception, if the task was canceled
    				BatchTask.logAndThrowException(ex, this);
    			}
    		} finally {
    			BatchTask.clearWriters(eventualOutputs);
    			// --------------------------------------------------------------------
    			// Closing
    			// --------------------------------------------------------------------
    			if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
    				((RichInputFormat) this.format).closeInputFormat();
    				LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
    			}
    		}
    
    		if (!this.taskCanceled) {
    			LOG.debug(getLogString("Finished data source operator"));
    		}
    		else {
    			LOG.debug(getLogString("Data source operator cancelled"));
    		}
    	}
    
    

    DelimitedInputFormat.open()

    • 调用FileInputFormat.open(split),设置当前切片信息(切片的开始位置,切片长度),和定位开始位置
    • initBuffers();// 初使化Buffers信息,默认的readBuffer大小为1M,wrapBuffer 为256 byte
    • splitStart >0 需要调用 DelimitedInputFormat.readLine(),如果是 splitStart =0 ,直接调用fillBuffer
    /**
    	 * Opens the given input split. This method opens the input stream to the specified file, allocates read buffers
    	 * and positions the stream at the correct position, making sure that any partial record at the beginning is skipped.
    	 *
    	 * @param split The input split to open.
    	 *
    	 * @see org.apache.flink.api.common.io.FileInputFormat#open(org.apache.flink.core.fs.FileInputSplit)
    	 */
    	@Override
    	public void open(FileInputSplit split) throws IOException {
    		super.open(split);
    		initBuffers();
    
    		this.offset = splitStart;
    		if (this.splitStart != 0) {
    			this.stream.seek(offset);
    			readLine();
    			// if the first partial record already pushes the stream over
    			// the limit of our split, then no record starts within this split
    			if (this.overLimit) {
    				this.end = true;
    			}
    		} else {
    			fillBuffer(0);
    		}
    	}
    

    FileInputFormat.open(split)

    • 调置当前分片

      this.currentSplit = fileSplit;
      
    • 调置当前分片的开始位置

      this.splitStart = fileSplit.getStart();
      
    • 调置当前分片的长度

      this.splitLength = fileSplit.getLength();
      
    • 流定位到开始位置

      	// get FSDataInputStream
      		if (this.splitStart != 0) {
      			this.stream.seek(this.splitStart);
      		}
      
    /**
    	 * Opens an input stream to the file defined in the input format.
    	 * The stream is positioned at the beginning of the given split.
    	 * <p>
    	 * The stream is actually opened in an asynchronous thread to make sure any interruptions to the thread 
    	 * working on the input format do not reach the file system.
    	 */
    	@Override
    	public void open(FileInputSplit fileSplit) throws IOException {
    
    		this.currentSplit = fileSplit;
    		this.splitStart = fileSplit.getStart();
    		this.splitLength = fileSplit.getLength();
    
    		if (LOG.isDebugEnabled()) {
    			LOG.debug("Opening input split " + fileSplit.getPath() + " [" + this.splitStart + "," + this.splitLength + "]");
    		}
    
    		
    		// open the split in an asynchronous thread
    		final InputSplitOpenThread isot = new InputSplitOpenThread(fileSplit, this.openTimeout);
    		isot.start();
    		
    		try {
    			this.stream = isot.waitForCompletion();
    			this.stream = decorateInputStream(this.stream, fileSplit);
    		}
    		catch (Throwable t) {
    			throw new IOException("Error opening the Input Split " + fileSplit.getPath() + 
    					" [" + splitStart + "," + splitLength + "]: " + t.getMessage(), t);
    		}
    		
    		// get FSDataInputStream
    		if (this.splitStart != 0) {
    			this.stream.seek(this.splitStart);
    		}
    	}
    
    

    DelimitedInputFormat.initBuffers

    • 默认的readBuffer大小为1M,wrapBuffer 为256 byte

    • 当前切片默认值设置

      	this.readPos = 0;
      		this.limit = 0;
      		this.overLimit = false;
      		this.end = false;
      
    private void initBuffers() {
    		this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
    
    		if (this.bufferSize <= this.delimiter.length) {
    			throw new IllegalArgumentException("Buffer size must be greater than length of delimiter.");
    		}
    
    		if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
    			this.readBuffer = new byte[this.bufferSize];
    		}
    		if (this.wrapBuffer == null || this.wrapBuffer.length < 256) {
    			this.wrapBuffer = new byte[256];
    		}
    
    		this.readPos = 0;
    		this.limit = 0;
    		this.overLimit = false;
    		this.end = false;
    	}
    

    DelimitedInputFormat.fillBuffer()

    • 读到数据到缓存中,如果数据大于1m就读1m的数据,如果小于1m,就把当前切片的数据全部读完
    0 = 10
    1 = 98
    2 = 32
    3 = 99
    
    /**
    	 * Fills the read buffer with bytes read from the file starting from an offset.
    	 */
    	private boolean fillBuffer(int offset) throws IOException {
    		int maxReadLength = this.readBuffer.length - offset;
    		// special case for reading the whole split.
    		if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
    			int read = this.stream.read(this.readBuffer, offset, maxReadLength);
    			if (read == -1) {
    				this.stream.close();
    				this.stream = null;
    				return false;
    			} else {
    				this.readPos = offset;
    				this.limit = read;
    				return true;
    			}
    		}
    		
    		// else ..
    		int toRead;
    		if (this.splitLength > 0) {
    			// if we have more data, read that
    			toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength;
    		}
    		else {
    			// if we have exhausted our split, we need to complete the current record, or read one
    			// more across the next split.
    			// the reason is that the next split will skip over the beginning until it finds the first
    			// delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
    			// previous split.
    			toRead = maxReadLength;
    			this.overLimit = true;
    		}
    
    		int read = this.stream.read(this.readBuffer, offset, toRead);
    
    		if (read == -1) {
    			this.stream.close();
    			this.stream = null;
    			return false;
    		} else {
    			this.splitLength -= read;
    			this.readPos = offset; // position from where to start reading
    			this.limit = read + offset; // number of valid bytes in the read buffer
    			return true;
    		}
    	}
    

    DelimitedInputFormat.nextRecord

    • 读取一行数据,从当前分片,DelimitedInputFormat.open()已重新计算当前切片的开始位置
    • 调用DelimitedInputFormat.readLine() 读取当前切片的一行数据
    public OT nextRecord(OT record) throws IOException {
    		if (readLine()) {
    			return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    		} else {
    			this.end = true;
    			return null;
    		}
    	}
    

    DelimitedInputFormat.readLine()

    • DelimitedInputFormat.open()已重新计算当前切片的开始位置,但是切片的长度不变,还是读取以前计算的长度
    • 读到第一个换行符的数据,即读一行数据,如果没有换行符,当读取到当前切片最大长度
    			// Search for next occurrence of delimiter in read buffer.
    			while (this.readPos < this.limit && delimPos < this.delimiter.length) {
    				if ((this.readBuffer[this.readPos]) == this.delimiter[delimPos]) {
    					// Found the expected delimiter character. Continue looking for the next character of delimiter.
    					delimPos++;
    				} else {
    					// Delimiter does not match.
    					// We have to reset the read position to the character after the first matching character
    					//   and search for the whole delimiter again.
    					readPos -= delimPos;
    					delimPos = 0;
    				}
    				readPos++;
    			}
    
    • 从缓存区readBuffer复制当前行数据到 wrapBuffer

      System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, bytesToMove);
      
    • 如果有换行符,需要删除换行符,在readBuffer

      // move delimiter chars to the beginning of the readBuffer
      				System.arraycopy(this.readBuffer, this.readPos - delimPos, this.readBuffer, 0, delimPos);
      
    • 当前切片需要读取的个数this.limit,读取完后,继续读一个1m的数据到缓存中,最后将当前行数据返回setResult

    • fillBuffer函数中,如果当前切处数据读完了,会设置overLimit = true,读下一行数据时就不满足条件就不会读了

    if (this.readPos >= this.limit) {
    				// readBuffer is completely consumed. Fill it again but keep partially read delimiter bytes.
    				if (!fillBuffer(delimPos)) {
    					int countInReadBuffer = delimPos;
    					if (countInWrapBuffer + countInReadBuffer > 0) {
    						// we have bytes left to emit
    						if (countInReadBuffer > 0) {
    							// we have bytes left in the readBuffer. Move them into the wrapBuffer
    							if (this.wrapBuffer.length - countInWrapBuffer < countInReadBuffer) {
    								// reallocate
    								byte[] tmp = new byte[countInWrapBuffer + countInReadBuffer];
    								System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
    								this.wrapBuffer = tmp;
    							}
    
    							// copy readBuffer bytes to wrapBuffer
    							System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, countInReadBuffer);
    							countInWrapBuffer += countInReadBuffer;
    						}
    
    						this.offset += countInWrapBuffer;
    						setResult(this.wrapBuffer, 0, countInWrapBuffer);
    						return true;
    					} else {
    						return false;
    					}
    				}
    			}
    
    
    • 第一次读到的数据为
    b c
    
    protected final boolean readLine() throws IOException {
    		if (this.stream == null || this.overLimit) {
    			return false;
    		}
    
    		int countInWrapBuffer = 0;
    
    		// position of matching positions in the delimiter byte array
    		int delimPos = 0;
    
    		while (true) {
    			if (this.readPos >= this.limit) {
    				// readBuffer is completely consumed. Fill it again but keep partially read delimiter bytes.
    				if (!fillBuffer(delimPos)) {
    					int countInReadBuffer = delimPos;
    					if (countInWrapBuffer + countInReadBuffer > 0) {
    						// we have bytes left to emit
    						if (countInReadBuffer > 0) {
    							// we have bytes left in the readBuffer. Move them into the wrapBuffer
    							if (this.wrapBuffer.length - countInWrapBuffer < countInReadBuffer) {
    								// reallocate
    								byte[] tmp = new byte[countInWrapBuffer + countInReadBuffer];
    								System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
    								this.wrapBuffer = tmp;
    							}
    
    							// copy readBuffer bytes to wrapBuffer
    							System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, countInReadBuffer);
    							countInWrapBuffer += countInReadBuffer;
    						}
    
    						this.offset += countInWrapBuffer;
    						setResult(this.wrapBuffer, 0, countInWrapBuffer);
    						return true;
    					} else {
    						return false;
    					}
    				}
    			}
    
    			int startPos = this.readPos - delimPos;
    			int count;
    
    			// Search for next occurrence of delimiter in read buffer.
    			while (this.readPos < this.limit && delimPos < this.delimiter.length) {
    				if ((this.readBuffer[this.readPos]) == this.delimiter[delimPos]) {
    					// Found the expected delimiter character. Continue looking for the next character of delimiter.
    					delimPos++;
    				} else {
    					// Delimiter does not match.
    					// We have to reset the read position to the character after the first matching character
    					//   and search for the whole delimiter again.
    					readPos -= delimPos;
    					delimPos = 0;
    				}
    				readPos++;
    			}
    
    			// check why we dropped out
    			if (delimPos == this.delimiter.length) {
    				// we found a delimiter
    				int readBufferBytesRead = this.readPos - startPos;
    				this.offset += countInWrapBuffer + readBufferBytesRead;
    				count = readBufferBytesRead - this.delimiter.length;
    
    				// copy to byte array
    				if (countInWrapBuffer > 0) {
    					// check wrap buffer size
    					if (this.wrapBuffer.length < countInWrapBuffer + count) {
    						final byte[] nb = new byte[countInWrapBuffer + count];
    						System.arraycopy(this.wrapBuffer, 0, nb, 0, countInWrapBuffer);
    						this.wrapBuffer = nb;
    					}
    					if (count >= 0) {
    						System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, count);
    					}
    					setResult(this.wrapBuffer, 0, countInWrapBuffer + count);
    					return true;
    				} else {
    					setResult(this.readBuffer, startPos, count);
    					return true;
    				}
    			} else {
    				// we reached the end of the readBuffer
    				count = this.limit - startPos;
    				
    				// check against the maximum record length
    				if (((long) countInWrapBuffer) + count > this.lineLengthLimit) {
    					throw new IOException("The record length exceeded the maximum record length (" + 
    							this.lineLengthLimit + ").");
    				}
    
    				// Compute number of bytes to move to wrapBuffer
    				// Chars of partially read delimiter must remain in the readBuffer. We might need to go back.
    				int bytesToMove = count - delimPos;
    				// ensure wrapBuffer is large enough
    				if (this.wrapBuffer.length - countInWrapBuffer < bytesToMove) {
    					// reallocate
    					byte[] tmp = new byte[Math.max(this.wrapBuffer.length * 2, countInWrapBuffer + bytesToMove)];
    					System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
    					this.wrapBuffer = tmp;
    				}
    
    				// copy readBuffer to wrapBuffer (except delimiter chars)
    				System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, bytesToMove);
    				countInWrapBuffer += bytesToMove;
    				// move delimiter chars to the beginning of the readBuffer
    				System.arraycopy(this.readBuffer, this.readPos - delimPos, this.readBuffer, 0, delimPos);
    
    			}
    		}
    	}
    

    end

    展开全文
  • tomcat+高德地图加载本地切片数据效果图准备步骤 效果图 准备 tomcat8+ 切片数据(3857坐标) 步骤 将配置好的坐标的切片数据放置到指定的文件路径下(例如F:/arcgisData)。 点开tomcat -> conf -> ...

    tomcat+高德地图加载本地切片数据

    效果图

    在这里插入图片描述

    准备

    1. tomcat8+
    2. 切片数据(3857坐标)
      在这里插入图片描述
      在这里插入图片描述

    步骤

    1. 将配置好的坐标的切片数据放置到指定的文件路径下(例如F:/arcgisData)。
    2. 点开tomcat -> conf -> server.xml文件,在Host下添加如下配置并保存文件。
    <Context path="/file"/ docBase="F:\arcgisData" reloadable="true" debug="0"/>
    !-- 	  <Context>元素的属性:
    			path:       指定访问该Web应用的URL入口。
    			docBase: 指定Web应用的文件路径,可以给定绝对路径,也可以给定相对于<Host>的appBase属性的相对路径,如果Web应用采用开放目录结构,则指定Web应用的根目录,如果Web应用是个war文件,则指定war文件的路径。(指定项目所在地址)
    			reloadable: 如果这个属性设为true,tomcat服务器在运行状态下会监视在WEB-INF/classes和WEB-INF/lib目录下class文件的改动,如果监测到有class文件被更新的,服务器会自动重新加载Web应用。
    			debug:      为设定debug的等级0提供最少的信息,9提供最多的信息. -->
    
    1. 重启Tomcat,加载地图。
      加载地图代码示例:
    <!doctype html>
    <html>
    <head>
        <meta charset="utf-8">
        <meta http-equiv="X-UA-Compatible" content="IE=edge">
        <meta name="viewport" content="initial-scale=1.0, user-scalable=no, width=device-width">
        <title>loadmap</title>
        <link rel="stylesheet" href="https://a.amap.com/jsapi_demos/static/demo-center/css/demo-center.css" type="text/css">
    	<script type="text/javascript" src="http://webapi.amap.com/maps?v=1.4.9&key=你的key &plugin=AMap.MouseTool,AMap.PolyEditor,AMap.CircleEditor,AMap.Marker"></script>
        <script type="text/javascript" src="https://cache.amap.com/lbs/static/addToolbar.js"></script>
        <style>
          html,body,#container{
            height: 100%
          }
        </style>
    </head>
    <body>
    <div id="container"></div>
    <script type="text/javascript">
        //初始化地图对象,加载地图
        var map = new AMap.Map("container", {
            resizeEnable: true,
            zoom: 13
        });
        
        var googleSWLayer = new AMap.TileLayer({
            getTileUrl: function (x, y, z) {
                var oo = "00000000";
                z = "L" + z;
                var xx = x.toString(16);
                x = "C" + oo.substring(0, 8 - xx.length) + xx;
                var yy = (y).toString(16); // 注意此处,计算方式变了
                y = "R" + oo.substring(0, 8 - yy.length) + yy;
                  //切片资源测试地址		
                 //http://127.0.0.1:8181/file/_alllayers/L11/R0000037c/C00000689.png
                return 'http://127.0.0.1:8181/file/_alllayers/' + z + '/' + y + '/' + x + '.png';
            },
            zIndex: 100
        });
        //设置进去MAP
        googleSWLayer.setMap(map);
    
    </script>
    </body>
    </html>
    
    展开全文
  • 问题提出:如何对切片数据重投影? 转载于:https://www.cnblogs.com/feedback/p/5440822.html

    问题提出:如何对切片数据重投影?

     

    转载于:https://www.cnblogs.com/feedback/p/5440822.html

    展开全文
  • 高德数据下载工具是一款免费的可以用于下载高德POI数据、道路数据、切片数据的软件,并自动将数据下载生成SHP文件,而且对矢量数据坐标自动进行了校正(数据坐标系为WGS84)。 如您对数据下载有其它需求和建议,请...
  • 针对虚拟人切片数据量大、解剖结构复杂等特点,对分割虚拟人切片图像的基于二叉树SVM多类分割方法进行研究。基于二叉树的SVM多类分割方法较其他SVM多分类方法更符合人们分割虚拟人切片图像的习惯,而且能获得较高的...
  • ArcGis for silverlight 加载 Google Maps 切片数据 很简陋 参照例子做的
  • 原标题:【教你一招】如何将小范围在线地图切片数据转换为GIS矢量数据?【城市数据师梦想特训营9月北京火热招生】上面的特训营2和特训营3两门精品大数据课程都由本文作者高级城市数据师 张海平老师带来,欢迎派友们...
  • 本小节主要讲基于GeoWebCache的切片数据格式 1 概述 &nbsp;&nbsp;&nbsp;&nbsp;在GeoServer阵营中,可以选择UDig和QGIS进行数据的符号化。并将符号化后的sld符号文件导入到GeoServer中,进行数据发布...
  • MBTiles切片数据标准

    2020-12-10 10:36:22
    Tiles 创建SQL CREATE TABLE tiles (zoom_level integer, tile_column integer, tile_row integer, tile_data blob); 描述 必须包含一个为tiles的表(可以是视图) ...切片规范名称 必须 必须 必须 必须
  •  ArcGIS 10.3开始,启动了一种新的切片管理方式。将缓存的索引信息.bundlx包含在了缓存的切片... /// 从本地切片文件读取相应的层行列对应的切片,针对Arcgis v2.0切片数据格式 /// &lt;/summary&gt; ...
  • 1. 切片切片是数组的一个引用,因此切片是引用类型 func main() { var arr = [6]int{1, 2, 3, 4, 5} var slice = arr[1:] fmt.Println(arr) fmt.Println(slice) fmt.Printf("%p\n", &arr[1])...
  • arcgis server发布的切片服务,后期对切片数据单独进行更新。因为管理员权限问题,导致无法更新。使用文件粉碎器也无法进行清除。尝试过添加everyone用户,也无法解决。 右键文件夹,选择“属性”,选择"安全",...
  • 亲测当数据是多部件元素时可能会出现数据切片不正常。 解决方案,将数据变为单部件元素。 1.从QGIS导出,强制转换为单部件。 2.检查数据库是不是单部件,直接连接数据库图层数据(不用shp) ...
  • 条件:ArcGIS 切片需要10.2以上服务切图。1、将GeoWebCache的war包拷贝到GeoServer Tomcat 的Webapps 目录;完成GeoWebCache的安装。GeoServer安装详细见网上资料。 2、配置缓存路径,如下所示:如果节点已经存在,...
  • arcgis server能够将矢量(vector)数据,发布成wms的服务形式,并能够进行相应的切片,发布wmts服务,在客户端能够以切片的形式进行访问使用。 arcmap直接连上arcgis server,就能够进行数据发布,在File上选择...
  • 1、数据集:flir adas的数据集,coco数据集格式。
  • 例子1 slice用法介绍 load mri D = double(squeeze(D)); D(D==0)=nan; h = slice(D, [], [], 1:size(D,3)); set(h, 'EdgeColor','none', 'FaceColor','interp') alpha(.4) ...[x,y,z] = meshgrid(-1.25:.1:-.25,-2:.2:2...
  •  目前ArcGIS的切片格式目前主要可以分为三种形式,松散型切片格式,紧凑型切片格式和紧凑型2.0数据格式。 2 松散型切片格式  第一个比例尺的文件夹名为L00,第二个比例尺的为L01,如此类推。比例尺文件夹(一下...
  • DataFrame类型由共用相同索引的...DataFrame常用于表达二维数据,但可以表达多维数据。 可以由如下类型创建: ①二维ndarray对象。 ②由一维ndarray,列表,字典,元祖或series构成的字典 ③series类型 ④...
  • 近来,常使用一款谷歌地图下载器下载卫星影像数据,该下载器可以直接将下载的谷歌影像数据导出为arcserver切片,其实这也比较容易实现,只要将谷歌切片的行列号命名方式改成arcserver切片的命名方式即可,...
  • geoserver发布ArcGIS地图服务(切片),实际上是通过geowebcache发布的,虽然网上说geoserver2.X版本之后内置了geowebcache,但是并不能很好的与ArcGIS那一套一起用。所以还是得用一个独立的geowebcache。 下载...
  • 3.要发布静态地形切片数据可将切片文件放置于 nginx-1.17.1\html安装目录下的html文件夹中,配置如上 4、启动nginx,使dos命令,定位到nginx目录下,使用“start nginx.exe”启动nginx。 5、验证: (1...
  • 最近项目中,由于成本的考虑,要弃用arcgis的产品,用arcgis server 发布的的(影像)切片服务也不能直接使用,导致产生的缓存切片造成了浪费,再加上想偷懒项目主要开发环境:1.openlayer+arcgis for server 10.2 /...
  • 在ArcGIS Mobile 10中新增命名空间ESRI.ArcGIS.Mobile.DataProducts.RasterData :专门用于提供显示栅格数据的类。下面我们就来看一下,如何应用该类在Mobile中加载栅格数据。   首先要试验的栅格数据是将我们...
  • 把ArcGIS Server的服务切片文件拷贝到AndRoid设备SD卡的根目录下.       2.  在在eclipse中新建项目,核心代码如下:     publicclass LocalTiledLayer extends Activity{        MapViewmap =...
  • 数据切片(Split):在逻辑上对Map任务输入数据切片。 2.1.2 为什么要切片 将输入文件分为多片可以并行进行Map阶段的计算,提高Job的运行速度。一份数据切片就会有一个MapTask。 2.1.3 文件的切片机...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 15,187
精华内容 6,074
关键字:

切片数据