精华内容
下载资源
问答
  • transformation

    2018-04-28 14:16:24
    本篇文档介绍了我们在生产中常用的transformation和action
  • Transformation篇:Android 玩转Glide4—Transformation篇 概述 再基础篇和进阶篇中,我们简单介绍了Glide4的用法,和一些进阶的使用。 本篇Transformation转换篇,将给大家介绍Glide4强大的转换功能。 Glide自带的...
  • 二维仿射变换矩阵 ... 它主要用于需要跟踪或创建变换并希望将其永久/手动应用于您自己的点和... $ git clone https://github.com/epistemex/transformation-matrix-js.git git 通过 SSH: $ git clone git@github.com
  • 利用kettle运行transformation,完成数据库表到数据库表的ETL过程.mp4ETL工具Kettle研究-1-ETL认识与Kettle研读 ETL工具Kettle研究-2-Kettle安装部署 ETL工具Kettle研究-3-MySQL数据导入HIVE ETL工具Kettle研究-4-...
  • WINDOWS仿真MAC的桌面和UI,仿真度高,外观优质,占用内存一般。
  • SST(奇异频谱变换) python的奇异频谱变换的快速实现。 什么是SST? 变更点检测算法。 查看更多 特征 快速计算 Lanczos方法的高效算法 ... $pip install fastsst ... from fastsst import ...
  • 使用此函数对输入信号执行 Park_transformation。 它与 simulink 块 Park_transformation 中的相同
  • 利用kettle运行transformation,完成从Excel表到数据库表的ETL过程ETL工具Kettle研究-1-ETL认识与Kettle研读 ETL工具Kettle研究-2-Kettle安装部署 ETL工具Kettle研究-3-MySQL数据导入HIVE ETL工具Kettle研究-4-...
  • 该系统的基本目标是在封面图像后面隐藏重要或秘密图像。 要执行此操作,使用的过程是 DES 加密和解密以及基于 DCT 的嵌入。
  • 运营转型 ot 的一个示例实现,以确保我正确理解它。 可以在和找到有用的资源。 OT.js 提供了一个很好的可视化。
  • 数字化转型之战(winning-strategy-digital-transformation.pdf)
  • fab-transformation.zip

    2019-07-30 11:31:28
    fab-transformation.zip,太多无法一一验证是否可用,程序如果跑不起来需要自调,部分代码功能进行参考学习。
  • 早在十多年前,一些具有前瞻视野的企业以实现“数字化”为目标启动转型实践。但时至今日,可以说尚无家企业能够在真正意义上实现“数字化”。在实现“数字化”的征途上,人们发现,努力愈进,仿佛终点愈远。...
  • This paper reports a detection method of two-dimensional (2D) enhancement and three-dimensional (3D) reconstruction for subtle traces with reflectance transformation imaging, which can effectively ...
  • Transformation Routines

    2019-03-06 01:27:03
    Tansformation Routines Relevant How to documents 博文链接:https://jgtang82.iteye.com/blog/189157
  • 基于Brill算法在Java中的重新实现,用于训练和部署文本处理应用程序(例如词性标记器)的Java应用程序。
  • 简单介绍Jordan-Wigner变换
  • 1.rotate旋转旋转图片,单位deg,为“度”的意思 CSS Code复制内容到剪贴板 -moz-transform: rotate(20deg); -webkit-transform: rotate(20deg); -o-transform: rotate(20deg); -ms-transform: rotate(20deg...
  • 为了帮助您评估现有项目的状态,ODM Transformation Advisor应用程序(OTA)应用程序对在Decision Center存储库中找到的规则项目和决策服务执行了一系列的健全性检查。 特别是,它将查找并标记不赞成使用的功能以及...
  • •COVID-19 Impact •Growing Opportunities –and Threats •Rethinking Data Governance & Management •Data Management & Maturity Model (DMM) •2020 State of Cybersecurity: Threats and Skills Gaps ...
  • The point clouds scanned by a 3D laser scanner may be affine transformed when the size and posture of the objects being scanned are different. This type of problem is common, but few algorithms can ...
  • 使用此函数对输入信号执行 Clarke_transformation_inverse,就像在 simulink 中的 Clarke_transformation_inverse 块中一样
  • 蛋白质的转型报告 -food for thought - the protein transformation.pdf
  • 图像处理中灰度变换的matlab代码 幂次变换,次数小于1时,增强低灰度,减弱高灰度;次数大于1时增强高灰度,减弱低灰度。分段线性变换 改善对比度 将(f1,f2)灰度扩展到(g1,g2)
  • yuv420_transformation.zip

    2020-05-08 15:24:07
    读取BMP图片,转换为RGB与YU12(YUV420P)格式,同时YU12、NV12、NV21之间相互转换
  • matlab开发-coordinateTransformation。多普勒效应测量装置的坐标变换。
  • 雅可比矩阵的意义在于它代表了从物理空间到计算空间的映射的有效性。 为了使映射有效,Jacobian 应该在整个域中具有相同的符号,即对于从右手坐标系到右手坐标系的变换,Jacobian 应该大于零。 当雅可比变为零时,...
  • Park Transformation 派克变换,静止三相坐标系abc变换到同步旋转坐标系dq0
  • burrow wheeler transformation, algorithm in sequence matching
  • * * @param name The name of the {@code Transformation}, this will be shown in Visualizations and * the Log * @param outputType The output type of this {@code Transformation} * @param parallelism The ...

    一 .前言

    {@code Transformation} 代表创建DataStream 的操作.
    每个数据流都有一个底层的{@code Transformation},它是所述数据流的起源。

    类似DataStream#map 这样的API操作在底层都会构建一个 {@code Transformation}s 树 .

    当 stream 程序执行的时候都会将这个graph使用StreamGraphGenerator 生成一个StreamGraph

    {@code Transformation}不一定对应于运行时的物理操作。

    一些操作仅仅是逻辑上的概念

     Source              Source
         +                   +
         |                   |
         v                   v
     Rebalance          HashPartition
         +                   +
         |                   |
         |                   |
         +------>Union<------+
                   +
                   |
                   v
                 Split
                   +
                   |
                   v
                 Select
                   +
                   v
                  Map
                   +
                   |
                   v
                 Sink
    

    将在运行时生成此操作图:

    Source              Source
      +                   +
      |                   |
      |                   |
      +------->Map<-------+
                +
                |
                v
               Sink
    

    partitioning, union, split/select 之类的操作最终都会被编码在连接map算子的edges中.

    二 .代码相关

    2.1. 属性

    这里定义了Transformation id ,以及相关资源的属性定义…

    
        // 32768
        // Has to be equal to StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
        public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15;
    
        // 分配 唯一ID 使用
        // This is used to assign a unique ID to every Transformation
        protected static Integer idCounter = 0;
    
        public static int getNewNodeId() {
            idCounter++;
            return idCounter;
        }
    
        protected final int id;
    
        protected String name;
    
    
        // 输出类型通过TypeInformation类封装,
        // 用来生成序列化用的serializers和比较大小用的comparators,以及进行一些类型检查。
        protected TypeInformation<T> outputType;
    
    
        // 用于处理MissingTypeInfo。
        //
        // This is used to handle MissingTypeInfo.
        //
        // As long as the outputType has not been queried
        // it can still be changed using setOutputType().
        //
        // Afterwards an exception is thrown when trying to change the output type.
        protected boolean typeUsed;
    
        // 并行度
        private int parallelism;
    
        /**
         * The maximum parallelism for this stream transformation.
         *
         * It defines the upper limit for dynamic scaling and the number of key groups used for partitioned state.
         */
        private int maxParallelism = -1;
    
        /**
         * The minimum resources for this stream transformation. It defines the lower limit for dynamic
         * resources resize in future plan.
         */
        private ResourceSpec minResources = ResourceSpec.DEFAULT;
    
        /**
         * 此stream转换的首选资源。
         * 它定义了未来计划中动态资源调整的上限。
         *
         *
         * The preferred resources for this stream transformation.
         *
         * It defines the upper limit for dynamic resource resize in future plan.
         */
        private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
    
        /**
         *
         *
         * Each entry in this map represents a operator scope use case that this transformation needs managed memory for.
         *
         * The keys indicate the use cases, while the values are the
         * use-case-specific weights for this transformation.
         *
         *
         * Managed memory reserved for a use case
         * will be shared by all the declaring transformations within a slot according to this weight.
         */
        private final Map<ManagedMemoryUseCase, Integer> managedMemoryOperatorScopeUseCaseWeights =  new HashMap<>();
    
        /** Slot scope use cases that this transformation needs managed memory for. */
        private final Set<ManagedMemoryUseCase> managedMemorySlotScopeUseCases = new HashSet<>();
    
        /**
         * transformation 指定的 User-specified ID
         * job重启时依旧使用相同的 operator ID.
         * 使用内部的 静态的counter 自动生成id ,
         *
         * User-specified ID for this transformation. This is used to assign the same operator ID across
         * job restarts.
         *
         * There is also the automatically generated {@link #id}, which is assigned from a static counter. That field is independent from this.
         */
        private String uid;
    
        private String userProvidedNodeHash;
    
        // 超时相关..
        protected long bufferTimeout = -1;
    
        // slot sharing 组..
        private String slotSharingGroup;
    
    
        @Nullable private String coLocationGroupKey;
    
    

    2.2. 构造方法

    只有一个构造方法, 根据name , 输出类型, 并行度构建Transformation.

        /**
         * 根据name , 输出类型, 并行度构建Transformation
         * Creates a new {@code Transformation} with the given name, output type and parallelism.
         *
         * @param name The name of the {@code Transformation}, this will be shown in Visualizations and
         *     the Log
         * @param outputType The output type of this {@code Transformation}
         * @param parallelism The parallelism of this {@code Transformation}
         */
        public Transformation(String name, TypeInformation<T> outputType, int parallelism) {
            this.id = getNewNodeId();
            this.name = Preconditions.checkNotNull(name);
            this.outputType = outputType;
            this.parallelism = parallelism;
            this.slotSharingGroup = null;
        }
    
    

    2.3. 资源相关

    名称含义
    setResources(ResourceSpec minResources, ResourceSpec preferredResources)为该 Transformation这是最小和首选的资源
    declareManagedMemoryUseCaseAtOperatorScope声明此转换包含特定的运算符作用域托管内存用例。
    declareManagedMemoryUseCaseAtSlotScope声明此转换包含特定的solt作用域托管内存用例。

    其他的就不细说了,就是对属性的 set/get 操作…

    2.4. 获取属性相关(需要子类实现)

    名称含义
    getTransitivePredecessors获取前置的Transformation
    getInputs获取输入的Transformation

    三 .实现类

    在这里插入图片描述

    Transformation 的实现类很多,所以我们先看部分一级的实现类.
    在这里插入图片描述

    实现类描述
    PhysicalTransformation创建物理操作,它启用设置{@link ChainingStrategy}
    UnionTransformation此转换表示多个输入{@link Transformation Transformations}的并集。
    这不会创建物理操作,只会影响上游操作与下游操作的连接方式。
    PartitionTransformation此转换表示输入元素分区的更改。
    这不会创建物理操作,只会影响上游操作与下游操作的连接方式。
    SideOutputTransformation此transformation表示对具有给定{@link OutputTag}的上游操作的边输出的选择。
    CoFeedbackTransformationxxx
    FeedbackTransformationxxx

    3.1. PhysicalTransformation

    创建物理操作,它启用设置{@link ChainingStrategy}

    PhysicalTransformation 继承Transformation抽象类, 新增了一个 setChainingStrategy的方法, 通过该方法可以定义operator 的连接方式.

        /** Sets the chaining strategy of this {@code Transformation}. */
        public abstract void setChainingStrategy(ChainingStrategy strategy);
    
    • ChainingStrategy 是一个枚举类.

    定义算子的链接方案。
    当一个操作符链接到前置线程时,意味着它们在同一个线程中运行。
    一个operator可以包含多个步骤.
    StreamOperator使用的默认值是{@link#HEAD},这意味着operator没有链接到它的前一个operator。
    大多数的operators 将会以 {@link #ALWAYS} 复写. 意味着他们会尽可能的chained 前置operator。

    类型描述
    ALWAYS算子会尽可能的Chain在一起(为了优化性能,最好是使用最大数量的chain和加大算子的并行度)
    NEVER当前算子不会与前置和后置算子进行Chain
    HEAD当前算子允许被后置算子Chain,但不会与前置算子进行Chain
    HEAD_WITH_SOURCES与HEAD类似,但此策略会尝试Chain Source算子

    3.2. UnionTransformation

    UnionTransformation是Transformation的子类. 表示多个输入{@link Transformation Transformations}的并集。 这不会创建物理操作,只会影响上游操作与下游操作的连接方式。

    UnionTransformation 只有一个属性 . private final List<Transformation<T>> inputs; 通过UnionTransformation的构造方法传入.
    传入的时候会依次验证输入的类型和输出的类型是否一致.

     /**
         * 通过给定的Transformations 构建一个UnionTransformation
         * Creates a new {@code UnionTransformation} from the given input {@code Transformations}.
         *
         * <p>The input {@code Transformations} must all have the same type.
         *
         * @param inputs The list of input {@code Transformations}
         */
        public UnionTransformation(List<Transformation<T>> inputs) {
            super("Union", inputs.get(0).getOutputType(), inputs.get(0).getParallelism());
    
            for (Transformation<T> input : inputs) {
                if (!input.getOutputType().equals(getOutputType())) {
                    throw new UnsupportedOperationException("Type mismatch in input " + input);
                }
            }
    
            this.inputs = Lists.newArrayList(inputs);
        }
    

    实现了getInputs 方法和getTransitivePredecessors方法获取输入流…

        @Override
        public List<Transformation<?>> getInputs() {
            return new ArrayList<>(inputs);
        }
    
        @Override
        public List<Transformation<?>> getTransitivePredecessors() {
            List<Transformation<?>> result = Lists.newArrayList();
            result.add(this);
            for (Transformation<T> input : inputs) {
                result.addAll(input.getTransitivePredecessors());
            }
            return result;
        }
    

    3.3. PartitionTransformation

    PartitionTransformation是Transformation的子类.
    此转换表示输入元素分区的更改。
    它不会创建物理操作,它只会影响上游操作与下游操作的连接方式。

    3.3.1. 属性

    PartitionTransformation 有三个属性 :

        // 输入
        private final Transformation<T> input;
    
        // 分区器
        private final StreamPartitioner<T> partitioner;
    
        // ShuffleMode : 如果是流的话 应该是: PIPELINED
        private final ShuffleMode shuffleMode;
    
    

    3.3.2. 构造方法

    根据输入和StreamPartitioner 生成一个PartitionTransformation

       /**
         * Creates a new {@code PartitionTransformation} from the given input and {@link
         * StreamPartitioner}.
         *
         * @param input The input {@code Transformation}
         * @param partitioner The {@code StreamPartitioner}
         * @param shuffleMode The {@code ShuffleMode}
         */
        public PartitionTransformation(
                Transformation<T> input, StreamPartitioner<T> partitioner, ShuffleMode shuffleMode) {
            super("Partition", input.getOutputType(), input.getParallelism());
            this.input = input;
            this.partitioner = partitioner;
            this.shuffleMode = checkNotNull(shuffleMode);
        }
    

    3.3.3. 获取属性相关

    剩下的就是获取分区器, ShuffleMode 以及 获取输入相关的方法了…

    
        /**
         * Returns the {@code StreamPartitioner} that must be used for partitioning the elements of the
         * input {@code Transformation}.
         */
        public StreamPartitioner<T> getPartitioner() {
            return partitioner;
        }
    
        /** Returns the {@link ShuffleMode} of this {@link PartitionTransformation}. */
        public ShuffleMode getShuffleMode() {
            return shuffleMode;
        }
    
        @Override
        public List<Transformation<?>> getTransitivePredecessors() {
            List<Transformation<?>> result = Lists.newArrayList();
            result.add(this);
            result.addAll(input.getTransitivePredecessors());
            return result;
        }
    
        @Override
        public List<Transformation<?>> getInputs() {
            return Collections.singletonList(input);
        }
    

    3.3.4. ShuffleMode

    ShuffleMode 定义了operators 之间交换数据的模式.
    ShuffleMode 是一个枚举类,一共有三种.

    名称描述
    PIPELINED生产者和消费者同时在线. 生产出的数据立即会被消费者消费…
    BATCH生产者先产生数据至完成&停止. 之后 消费者启动消费数据.
    UNDEFINEDshuffle mode : 未定义 , 由框架决定 shuffle mode. 框架最后将选择{@link ShuffleMode#BATCH}或{@link ShuffleMode#PIPELINED}中的一个。

    3.3.5. StreamPartitioner

    在这里插入图片描述
    根据上图可以获得信息. StreamPartitioner实现了ChannelSelector 接口.
    ChannelSelector实现了IOReadableWritable 接口.
    IOReadableWritable 里面只有read和write接口.
    ChannelSelector对ChannelSelector接口进行了扩展.
    新增了setup/selectChannel/isBroadcast 三个方法.

    3.3.5.1. IOReadableWritable 定义的方法清单 :

    每个类必须自行选择他们自己的二进制序列&反序列化方式.
    特别是,records 必须实现此接口,以便指定如何将其数据传输到二进制表示形式。
    实现此接口时,请确保实现类具有默认(无参数)构造函数!

    名称描述
    void read(DataInputView in) throws IOException;从给定的数据输入视图读取对象的内部数据。
    void write(DataOutputView out) throws IOException;将对象的内部数据写入给定的数据输出视图。

    3.3.5.2. ChannelSelector 定义的方法清单 :

    {@link ChannelSelector} 决定数据记录如何写入到 logical channels .

    名称描述
    setup设置 初始化 channel selector 的数量.
    selectChannel返回数据写入的 logical channel 的索引
    为broadcast channel selectors 调用此方法是非法的,
    在这种情况下,此方法可能无法实现(例如,通过抛出{@link UnsupportedOperationException})。
    isBroadcast返回channel selector是否始终选择所有输出通道

    3.3.5.3. StreamPartitioner 定义的清单 :

    StreamPartitioner 是一个抽象类, 实现了ChannelSelector 接口 .

    里面只有一个属性 protected int numberOfChannels; 通过 setup(int numberOfChannels)方法来设置 channel的数量.
    同时实现 isBroadcast方法返回的值为 false . 重写了 equals , hashCode 方法 .

    新增的抽象方法 copy .

    定义了 getUpstreamSubtaskStateMapper & getDownstreamSubtaskStateMapper 方法.

    定义了恢复 in-flight 数据期间上游重新回放时此分区程序的行为。

        /**
         * Defines the behavior of this partitioner, when upstream rescaled during recovery of in-flight
         * data.
         */
        public SubtaskStateMapper getUpstreamSubtaskStateMapper() {
            return SubtaskStateMapper.ARBITRARY;
        }
    
        /**
         * Defines the behavior of this partitioner, when downstream rescaled during recovery of
         * in-flight data.
         */
        public abstract SubtaskStateMapper getDownstreamSubtaskStateMapper();
    

    3.3.6. SubtaskStateMapper

    SubtaskStateMapper是一个枚举类 .
    {@code SubtaskStateMapper}缩小了回放期间需要读取的子任务,以便在检查点中存储了in-flight中的数据时从特定子任务恢复。

    旧子任务到新子任务的映射可以是唯一的,也可以是非唯一的。

    唯一分配意味着一个特定的旧子任务只分配给一个新的子任务。

    非唯一分配需要向下筛选。
    这意味着接收方必须交叉验证反序列化记录是否真正属于新的子任务。

    大多数{@code SubtaskStateMapper}只会产生唯一的赋值,因此是最优的
    一些重缩放器,比如{@link#RANGE},创建了唯一映射和非唯一映射的混合,其中下游任务需要对一些映射的子任务进行过滤。

    名称描述分区器
    ARBITRARY额外的状态被重新分配到其他子任务,而没有任何特定的保证(只匹配上行和下行)。超类: StreamPartitioner [getUpstreamSubtaskStateMapper]
    DISCARD_EXTRA_STATE丢弃额外状态。如果所有子任务都已包含相同的信息(广播),则非常有用。BroadcastPartitioner [getUpstreamSubtaskStateMapper]
    FIRST将额外的子任务还原到第一个子任务。GlobalPartitioner
    FULL将状态复制到所有子任务。这种回放会造成巨大的开销,完全依赖于对下游数据进行过滤。CustomPartitionerWrapper
    RANGE将旧范围重新映射到新范围KeyGroupStreamPartitioner
    ROUND_ROBIN以循环方式重新分配子任务状态。ShufflePartitioner,
    RebalancePartitioner,
    ForwardPartitioner ,
    RescalePartitioner,
    BroadcastPartitioner[getDownstreamSubtaskStateMapper]

    3.4. SideOutputTransformation

    SideOutputTransformation是Transformation的子类.
    此transformation表示对具有给定{@link OutputTag}的上游操作的边输出的选择。

    这个类有两个属性 一个输入 private final Transformation<?> input;,一个输出 private final OutputTag<T> tag;

    输入和输出的参数由SideOutputTransformation构造方法指定.

        public SideOutputTransformation(Transformation<?> input, final OutputTag<T> tag) {
            super("SideOutput", tag.getTypeInfo(), requireNonNull(input).getParallelism());
            this.input = input;
            this.tag = requireNonNull(tag);
        }
    
    • OutputTag类型

    {@link OutputTag}是一个类型化和命名的标记,用于标记操作符的边输出。
    {@code OutputTag}必须始终是匿名内部类,以便Flink可以为泛型类型参数派生一个{@link TypeInformation}。
    示例:

    OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>(“late-data”){};

    1. 属性
      只有两个属性,一个id , 另外一个是typeInfo.
        private final String id;
    
        private final TypeInformation<T> typeInfo;
    
    1. 构造方法

    有两个构造方法

        /**
         * Creates a new named {@code OutputTag} with the given id.
         *
         * @param id The id of the created {@code OutputTag}.
         */
        public OutputTag(String id) {
            Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
            Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
            this.id = id;
    
            try {
                this.typeInfo = TypeExtractor.createTypeInfo(this, OutputTag.class, getClass(), 0);
            } catch (InvalidTypesException e) {
                throw new InvalidTypesException(
                        "Could not determine TypeInformation for the OutputTag type. "
                                + "The most common reason is forgetting to make the OutputTag an anonymous inner class. "
                                + "It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.",
                        e);
            }
        }
    
        /**
         * Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation}.
         *
         * @param id The id of the created {@code OutputTag}.
         * @param typeInfo The {@code TypeInformation} for the side output.
         */
        public OutputTag(String id, TypeInformation<T> typeInfo) {
            Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
            Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
            this.id = id;
            this.typeInfo = Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null.");
        }
    

    3.5. CoFeedbackTransformation

    CoFeedbackTransformation的子类.

    这表示拓扑中的反馈点。
    feedback 元素不需要和 上有的{@code Transformation} 匹配.
    因为仅仅允许{@code CoFeedbackTransformation}之后的操作是 {@link org.apache.flink.streaming.api.transformations.TwoInputTransformation TwoInputTransformations}.

    上游{@code Transformation}将连接到Co-Transform 的第一个输入,而反馈 edges 将连接到第二个输入。

    同时保留了输入边和反馈边的划分。它们也可以有不同的分区策略。
    然而,这要求反馈{@code Transformation}的并行性必须与输入{@code Transformation}的并行度匹配。

    上游{@code Transformation}未连接到此{@code CoFeedbackTransformation}。
    而是直接连接到 {@code TwoInputTransformation} 之后的 {@code CoFeedbackTransformation}.
    这与批处理中的迭代不同。

    1. 属性
      只有两个属性
        private final List<Transformation<F>> feedbackEdges;
    
        private final Long waitTime;
    
    1. addFeedbackEdge
        /**
         * Adds a feedback edge. The parallelism of the {@code Transformation} must match the
         * parallelism of the input {@code Transformation} of the upstream {@code Transformation}.
         *
         * @param transform The new feedback {@code Transformation}.
         */
        public void addFeedbackEdge(Transformation<F> transform) {
    
            if (transform.getParallelism() != this.getParallelism()) {
                throw new UnsupportedOperationException(
                        "Parallelism of the feedback stream must match the parallelism of the original"
                                + " stream. Parallelism of original stream: "
                                + this.getParallelism()
                                + "; parallelism of feedback stream: "
                                + transform.getParallelism());
            }
    
            feedbackEdges.add(transform);
        }
    

    四 .PhysicalTransformation 子类

    PhysicalTransformation 的子类有点多, 挑几个类型瞄一眼. 其他的等有空的时候再看…
    在这里插入图片描述

    4.1. OneInputTransformation

    此转换表示将{@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} 应用于一个输入 {@link Transformation}.

    4.1.1. 属性

        private final Transformation<IN> input;
    
        private final StreamOperatorFactory<OUT> operatorFactory;
    
        private KeySelector<IN, ?> stateKeySelector;
    
        private TypeInformation<?> stateKeyType;
    

    4.1.2. 构造方法

        public OneInputTransformation(
                Transformation<IN> input,
                String name,
                StreamOperatorFactory<OUT> operatorFactory,
                TypeInformation<OUT> outputType,
                int parallelism) {
            super(name, outputType, parallelism);
            this.input = input;
            this.operatorFactory = operatorFactory;
        }
    

    4.2. TwoInputTransformation

    两个输入一个输出操作…

    /**
     * This Transformation represents the application of a {@link TwoInputStreamOperator} to two input
     * {@code Transformations}. The result is again only one stream.
     *
     * @param <IN1> The type of the elements in the first input {@code Transformation}
     * @param <IN2> The type of the elements in the second input {@code Transformation}
     * @param <OUT> The type of the elements that result from this {@code TwoInputTransformation}
     */
    

    4.3. AbstractMultipleInputTransformation

    AbstractMultipleInputTransformation抽象类…

    /**
     * Base class for transformations representing the application of a {@link
     * org.apache.flink.streaming.api.operators.MultipleInputStreamOperator} to input {@code
     * Transformations}. The result is again only one stream.
     *
     * @param <OUT> The type of the elements that result from this {@code MultipleInputTransformation}
     */
    
    • 属性
        protected final List<Transformation<?>> inputs = new ArrayList<>();
        protected final StreamOperatorFactory<OUT> operatorFactory;
    

    4.4. MultipleInputTransformation

    MultipleInputTransformation 是AbstractMultipleInputTransformation 子类.

    
    /** {@link AbstractMultipleInputTransformation} implementation for non-keyed streams. */
    @Internal
    public class MultipleInputTransformation<OUT> extends AbstractMultipleInputTransformation<OUT> {
        public MultipleInputTransformation(
                String name,
                StreamOperatorFactory<OUT> operatorFactory,
                TypeInformation<OUT> outputType,
                int parallelism) {
            super(name, operatorFactory, outputType, parallelism);
        }
    
        public MultipleInputTransformation<OUT> addInput(Transformation<?> input) {
            inputs.add(input);
            return this;
        }
    }
    
    

    4.5. KeyedMultipleInputTransformation

    KeyedMultipleInputTransformation 是AbstractMultipleInputTransformation 子类.

    • 新增了属性
    	private final List<KeySelector<?, ?>> stateKeySelectors = new ArrayList<>();
        protected final TypeInformation<?> stateKeyType;
    
    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 83,858
精华内容 33,543
关键字:

transformation