精华内容
下载资源
问答
  • 时, Waiting for table metadata lock 能一直锁很久。官网的一段话,可以理解下8.10.4. Metadata LockingMySQL 5.5.3 and up uses metadata locking to manage access to objects (tables, triggers...

    在修改/增加表字段的时候,发现很慢,

    show processlist; 时, Waiting for table metadata lock 能一直锁很久。

    官网的一段话,可以理解下

    8.10.4. Metadata Locking

    MySQL 5.5.3 and up uses metadata locking to manage access to objects (tables, triggers, and so forth). Metadata locking is used to ensure data consistency but does involve some overhead, which increases as query volume increases. Metadata contention increases the more that multiple queries attempt to access the same objects.

    Metadata locking is not a replacement for the table definition case, and its mutxes and locks differ from the LOCK_open mutex. The following discussion provides some information about how metadata locking works.

    To ensure transaction serializability, the server must not permit one session to perform a data definition language (DDL) statement on a table that is used in an uncompleted transaction in another session. The server achieves this by acquiring metadata locks on tables used within a transaction and deferring release of those locks until the transaction ends. A metadata lock on a table prevents changes to the table's structure. This locking approach has the implication that a table that is being used by a transaction within one session cannot be used in DDL statements by other sessions until the transaction ends.

    This principle applies not only to transactional tables, but also to nontransactional tables. Suppose that a session begins a transaction that uses transactional table t and nontransactional table nt as follows:

    START TRANSACTION;

    SELECT * FROM t;

    SELECT * FROM nt;

    Metadata locks are held on both t and nt until the transaction ends. If another session attempts a DDL operation on either table, it blocks until metadata lock release at transaction end. For example, a second session blocks if it attempts any of these operations:

    DROP TABLE t;

    ALTER TABLE t ...;

    DROP TABLE nt;

    ALTER TABLE nt ...;

    If the server acquires metadata locks for a statement that is syntactically valid but fails during execution, it does not release the locks early. Lock release is still deferred to the end of the transaction because the failed statement is written to the binary log and the locks protect log consistency.

    In autocommit mode, each statement is in effect a complete transaction, so metadata locks acquired for the statement are held only to the end of the statement.

    Metadata locks acquired during a PREPARE statement are released once the statement has been prepared, even if preparation occurs within a multiple-statement transaction.

    Before MySQL 5.5.3, when a transaction acquired the equivalent of a metadata lock for a table used within a statement, it released the lock at the end of the statement. This approach had the disadvantage that if a DDL statement occurred for a table that was being used by another session in an active transaction, statements could be written to the binary log in the wrong order

    一个没提交的事务使用了A表, 另外一个session 对A表进行alter,出现waiting for table metadata lock

    MySQL版本为5.6.12。

    在进行alter table操作时,有时会出现Waiting for table metadata lock的等待场景。而且,一旦alter table TableA的操作停滞在Waiting for table metadata lock的状态,后续对TableA的任何操作(包括读)都无法进行,也会在Opening tables的阶段进入Waiting for table metadata lock的队列。如果是产品环境的核心表出现了这样的锁等待队列,就会造成灾难性的后果。

    造成alter table产生Waiting for table metadata lock的原因其实很简单,一般是以下几个简单的场景:

    场景一:

    通过show processlist可以看到TableA上有正在进行的操作(包括读),此时alter table语句无法获取到metadata 独占锁,会进行等待。

    这是最基本的一种情形,这个和mysql 5.6中的online ddl并不冲突。一般alter table的操作过程中(见下图),在after create步骤会获取metadata 独占锁,当进行到altering table的过程时(通常是最花时间的步骤),对该表的读写都可以正常进行,这就是online ddl的表现,并不会像之前在整个alter table过程中阻塞写入。(当然,也并不是所有类型的alter操作都能online的,具体可以参见官方手册:http://dev.mysql.com/doc/refman/5.6/en/innodb-create-index-overview.html)

    4e61bbfd7653923c26efbadf2b24f3ef.png

    场景二:

    通过show processlist看不到TableA上有任何操作,但实际上存在有未提交的事务,可以在information_schema.innodb_trx中查看到。在事务没有完成之前,TableA上的锁不会释放,alter table同样获取不到metadata的独占锁。

    场景三:

    通过show processlist看不到TableA上有任何操作,在information_schema.innodb_trx中也没有任何进行中的事务。这很可能是因为在一个显式的事务中,对TableA进行了一个失败的操作(比如查询了一个不存在的字段),这时事务没有开始,但是失败语句获取到的锁依然有效。从performance_schema.events_statements_current表中可以查到失败的语句。

    官方手册上对此的说明如下:

    If the server acquires metadata locks for a statement that is syntactically valid but fails during execution, it does not release the locks early. Lock release is still deferred to the end of the transaction because the failed statement is written to the binary log and the locks protect log consistency.

    也就是说除了语法错误,其他错误语句获取到的锁在这个事务提交或回滚之前,仍然不会释放掉。because the failed statement is written to the binary log and the locks protect log consistency 但是解释这一行为的原因很难理解,因为错误的语句根本不会被记录到二进制日志。

    总之,alter table的语句是很危险的,在操作之前最好确认对要操作的表没有任何进行中的操作、没有未提交事务、也没有显式事务中的报错语句。如果有alter table的维护任务,在无人监管的时候运行,最好通过lock_wait_timeout设置好超时时间,避免长时间的metedata锁等待。

    参考:

    展开全文
  • ========================================================================ALTER TABLE 和FLUSH TABLE导致的间接等待场景:1、会话A执行耗时较长的操作;2、会话B执行ALTER TABLE 或FLUSH TABLES等操作时,会向...

    ========================================================================

    ALTER TABLE 和FLUSH TABLE导致的间接等待

    场景:

    1、会话A执行耗时较长的操作;

    2、会话B执行ALTER TABLE 或FLUSH TABLES等操作时,会向其他会话(线程)发送表变更通知,要求其他会话关闭再重新打开相关表;

    3、会话A执行过程中收到会话B的变更通知,在会话A执行结束前,会话A阻塞会话B执行;

    4、会话C收到会话B的变更通知,等待会话B完成,形成等待链:会话C>>会话B>>会话A

    5、会话B因为其他原因执行失败或被关闭,ALTER TABLE或FLSUH TABLE等操作被取消

    6、会话C转为等待会话A执行完成。

    官方文档:

    The thread got a notification that the underlying structure for a table has changed and it needs to reopen the table to get the new structure. However, to reopen the table, it must wait until all other threads have closed the table in question.

    This notification takes place if another thread has used FLUSH TABLES or one of the following statements on the table in question: FLUSH TABLES tbl_name, ALTER TABLE, RENAME TABLE, REPAIR TABLE, ANALYZE TABLE, orOPTIMIZE TABLE.

    ========================================================================

    ALTER TABLE过程报主键重复

    在MySQL 5.6版本中引入Online DDL特性,很多ALTER TABLE操作可以联机修改,该特性使用ROW LOG来保存ALTER操作期间发生的数据变化,并回放到新表中,保证数据一致性。

    如果在业务高峰期执行Online DDL操作,可能报下面错误:

    ERROR 1062 (23000) at line 13: Duplicate entry 'xxx' for key 'PRIMARY'

    解决办法:

    1、将DDL操作移到业务低峰期执行,降低错误出现概率

    2、如果业务允许阻塞,修改ALTER TABLE语句使用阻塞方式执行

    如ALTER命令为:

    ALTER TABLE TB001 ADD C2 INT;

    修改为:

    ALTER TABLE TB001 ADD C2 INT, ALGORITHM =COPY;

    ========================================================================

    修改表名

    修改表名有两种语法:

    RENAME TABLE old_table TO new_table;

    ALTER TABLE old_table RENAME new_table;

    使用RENAME TABLE方式可以一次修改多个表的表名,MySQL保证RENAME操作的原子性。

    When you execute RENAME TABLE, you cannot have any locked tables or active transactions. With that condition satisfied, the rename operation is done atomically; no other session can access any of the tables while the rename is in progress.

    展开全文
  • truncate table比较慢

    2021-05-01 07:21:37
    create table t4 as select * from dba_tables;create index t4_ind_1on t4(OWNER );create index t4_ind_2on t4(TABLE_NAME );create index t4_ind_3on t4(TABLESPAC...

    create table t4 as select * from dba_tables;

    create index t4_ind_1  on t4(OWNER                       );

    create index t4_ind_2  on t4(TABLE_NAME                  );

    create index t4_ind_3  on t4(TABLESPACE_NAME             );

    create index t4_ind_4  on t4(CLUSTER_NAME                );

    create index t4_ind_5  on t4(IOT_NAME                    );

    create index t4_ind_6  on t4(STATUS                      );

    create index t4_ind_7  on t4(PCT_FREE                    );

    create index t4_ind_8  on t4(PCT_USED                    );

    create index t4_ind_9  on t4(INI_TRANS                   );

    create index t4_ind_10 on t4(MAX_TRANS                   );

    create index t4_ind_11 on t4(INITIAL_EXTENT              );

    create index t4_ind_12 on t4(NEXT_EXTENT                 );

    create index t4_ind_13 on t4(MIN_EXTENTS                 );

    create index t4_ind_14 on t4(MAX_EXTENTS                 );

    create index t4_ind_15 on t4(PCT_INCREASE                );

    create index t4_ind_16 on t4(FREELISTS                   );

    create index t4_ind_17 on t4(FREELIST_GROUPS             );

    create index t4_ind_18 on t4(LOGGING                     );

    create index t4_ind_19 on t4(BACKED_UP                   );

    create index t4_ind_20 on t4(NUM_ROWS                    );

    create index t4_ind_21 on t4(EMPTY_BLOCKS                );

    create index t4_ind_22 on t4(AVG_SPACE                   );

    create index t4_ind_23 on t4(CHAIN_CNT                   );

    create index t4_ind_24 on t4(AVG_ROW_LEN                 );

    create index t4_ind_25 on t4(AVG_SPACE_FREELIST_BLOCKS   );

    create index t4_ind_26 on t4(NUM_FREELIST_BLOCKS         );

    create index t4_ind_27 on t4(DEGREE                      );

    create index t4_ind_28 on t4(INSTANCES                   );

    create index t4_ind_29 on t4(CACHE                       );

    create index t4_ind_30 on t4(TABLE_LOCK                  );

    create index t4_ind_31 on t4(SAMPLE_SIZE                 );

    create index t4_ind_32 on t4(LAST_ANALYZED               );

    create index t4_ind_33 on t4(PARTITIONED                 );

    create index t4_ind_34 on t4(IOT_TYPE                    );

    create index t4_ind_35 on t4(TEMPORARY                   );

    create index t4_ind_36 on t4(SECONDARY                   );

    create index t4_ind_37 on t4(NESTED                      );

    create index t4_ind_38 on t4(BUFFER_POOL                 );

    create index t4_ind_39 on t4(ROW_MOVEMENT                );

    create index t4_ind_40 on t4(GLOBAL_STATS                );

    create index t4_ind_41 on t4(USER_STATS                  );

    create index t4_ind_42 on t4(DURATION                    );

    create index t4_ind_43 on t4(SKIP_CORRUPT                );

    create index t4_ind_44 on t4(MONITORING                  );

    create index t4_ind_45 on t4(CLUSTER_OWNER               );

    create index t4_ind_46 on t4(DEPENDENCIES                );

    create index t4_ind_47 on t4(COMPRESSION                 );

    SQL> insert into t4 select * from t4;

    1322 rows created.

    SQL> /

    2644 rows created.

    SQL> /

    5288 rows created.

    SQL> /

    10576 rows created.

    SQL> /

    insert into t4 select * from t4

    *

    ERROR at line 1:

    ORA-01654: unable to extend index ZJD.T4_IND_38 by 128 in tablespace ORCL02

    SQL> commit;

    Commit complete.

    truncate table t4;

    展开全文
  • 背景:Python 自定义函数是 PyFlink Table API 中最重要的功能之一,其允许用户在 PyFlink Table API 中使用 Python 语言开发的自定义函数,极大...

    背景:Python 自定义函数是 PyFlink Table API 中最重要的功能之一,其允许用户在 PyFlink Table API 中使用 Python 语言开发的自定义函数,极大地拓宽了 Python Table API 的使用范围。

    目前 Python 自定义函数的功能已经非常完善,支持多种类型的自定义函数,比如 UDF(scalar function)、UDTF(table function)、UDAF(aggregate function),UDTAF(table aggregate function,1.13 支持)、Panda UDF、Pandas UDAF 等。接下来,我们详细介绍一下如何在 PyFlink Table API 作业中使用 Python 自定义函数。

    Tips:点击文末「阅读原文」即可查看更多技术干货~

     GitHub 地址 

    https://github.com/apache/flink

    欢迎大家给 Flink 点赞送 star~

    一、Python 自定义函数基础


    根据输入 / 输出数据的行数,Flink Table API & SQL 中,自定义函数可以分为以下几类:

    自定义函数

    Single Row Input

    Multiple Row Input

    Single Row Output

    ScalarFunction

    AggregateFunction

    Multiple Row Output

    TableFunction

    TableAggregateFunction

    PyFlink 针对以上四种类型的自定义函数都提供了支持,接下来,我们分别看一下每种类型的自定义函数如何使用。

    1. Python UDF

    Python UDF,即 Python ScalarFunction,针对每一条输入数据,仅产生一条输出数据。比如以下示例,展示了通过多种方式,来定义名字为 "sub_string" 的 Python UDF:

    from pyflink.table.udf import udf, FunctionContext, ScalarFunctionfrom pyflink.table import DataTypes
    方式一:@udf(result_type=DataTypes.STRING())def sub_string(s: str, begin: int, end: int):    return s[begin:end]
    方式二:sub_string = udf(lambda s, begin, end: s[begin:end], result_type=DataTypes.STRING())
    方式三:class SubString(object):    def __call__(self, s: str, begin: int, end: int):        return s[begin:end]
    sub_string = udf(SubString(), result_type=DataTypes.STRING())
    方式四:def sub_string(s: str, begin: int, end: int):    return s[begin:end]
    sub_string_begin_1 = udf(functools.partial(sub_string, begin=1), result_type=DataTypes.STRING())
    方式五:class SubString(ScalarFunction):    def open(self, function_context: FunctionContext):        pass
        def eval(self, s: str, begin: int, end: int):        return s[begin:end]
    sub_string = udf(SubString(), result_type=DataTypes.STRING())
    
    
    

    说明:

    • 需要通过名字为 “ udf ” 的装饰器,声明这是一个 scalar function;

    • 需要通过装饰器中的 result_type 参数,声明 scalar function 的结果类型;

    • 上述方式五,通过继承 ScalarFunction 的方式来定义 Python UDF 有以下用处:

      • ScalarFunction 的基类 UserDefinedFunction 中定义了一个 open 方法,该方法只在作业初始化时执行一次,因此可以利用该方法,做一些初始化工作,比如加载机器学习模型、连接外部服务等。

      • 此外,还可以通过 open 方法中的 function_context 参数,注册及使用 metrics。

    
    
    env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)
    table = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])table.select(sub_string(table.a, 1, 3))
    

    2. Python UDTF

    Python UDTF,即 Python TableFunction,针对每一条输入数据,Python UDTF 可以产生 0 条、1 条或者多条输出数据,此外,一条输出数据可以包含多个列。比如以下示例,定义了一个名字为 split 的Python UDF,以指定字符串为分隔符,将输入字符串切分成两个字符串:

    from pyflink.table.udf import udtffrom pyflink.table import DataTypes
    @udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])def split(s: str, sep: str):    splits = s.split(sep)    yield splits[0], splits[1]
    
    
    

    说明:

    • 需要通过名字为 “ udtf ” 的装饰器,声明这是一个 table function;

    • 需要通过装饰器中的 result_types 参数,声明 table function 的结果类型。由于 table function 每条输出可以包含多个列,result_types 需要指定所有输出列的类型;

    • Python UDTF 的定义,也支持 Python UDF 章节中所列出的多种定义方式,这里只展示了其中一种。

    定义完 Python UDTF 之后,可以直接在 Python Table API 中使用:

    env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)
    table = t_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])
    table.join_lateral(split(table.a, '|').alias("c1, c2"))table.left_outer_join_lateral(split(table.a, '|').alias("c1, c2"))
    


    3. Python UDAF

    Python UDAF,即 Python AggregateFunction。Python UDAF 用来针对一组数据进行聚合运算,比如同一个 window 下的多条数据、或者同一个 key 下的多条数据等。针对同一组输入数据,Python AggregateFunction 产生一条输出数据。比如以下示例,定义了一个名字为 weighted_avg 的 Python UDAF:

    from pyflink.common import Rowfrom pyflink.table import AggregateFunction, DataTypesfrom pyflink.table.udf import udaf
    
    class WeightedAvg(AggregateFunction):
        def create_accumulator(self):        # Row(sum, count)        return Row(0, 0)
        def get_value(self, accumulator: Row) -> float:        if accumulator[1] == 0:            return 0        else:            return accumulator[0] / accumulator[1]
        def accumulate(self, accumulator: Row, value, weight):        accumulator[0] += value * weight        accumulator[1] += weight
        def retract(self, accumulator: Row, value, weight):        accumulator[0] -= value * weight        accumulator[1] -= weight
    
    weighted_avg = udaf(f=WeightedAvg(),                    result_type=DataTypes.DOUBLE(),                    accumulator_type=DataTypes.ROW([                        DataTypes.FIELD("f0", DataTypes.BIGINT()),                        DataTypes.FIELD("f1", DataTypes.BIGINT())]))
    

    说明:

    • 需要通过名字为 “ udaf ” 的装饰器,声明这是一个 aggregate function,

    • 需要分别通过装饰器中的 result_type 及 accumulator_type 参数,声明 aggregate function 的结果类型及 accumulator 类型;

    • create_accumulator,get_value 和 accumulate 这 3 个方法必须要定义,retract 方法可以根据需要定义,详细信息可以参见 Flink 官方文档 [1];需要注意的是,由于必须定义 create_accumulator,get_value 和 accumulate 这 3 个方法,Python UDAF 只能通过继承AggregateFunction 的方式进行定义(Pandas UDAF 没有这方面的限制)。

    定义完 Python UDAF 之后,可以在 Python Table API 中这样使用:

    env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)
    t = t_env.from_elements([(1, 2, "Lee"), (3, 4, "Jay"), (5, 6, "Jay"), (7, 8, "Lee")],                        ["value", "count", "name"])
    t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg"))
    


    4. Python UDTAF

    Python UDTAF,即 Python TableAggregateFunction。Python UDTAF 用来针对一组数据进行聚合运算,比如同一个 window 下的多条数据、或者同一个 key 下的多条数据等,与 Python UDAF 不同的是,针对同一组输入数据,Python UDTAF 可以产生 0 条、1 条、甚至多条输出数据。

    以下示例,定义了一个名字为 Top2 的 Python UDTAF:

    from pyflink.common import Rowfrom pyflink.table import DataTypesfrom pyflink.table.udf import udtaf, TableAggregateFunction
    class Top2(TableAggregateFunction):
        def create_accumulator(self):        # 存储当前最大的两个值        return [None, None]
        def accumulate(self, accumulator, input_row):        if input_row[0] is not None:            # 新的输入值最大            if accumulator[0] is None or input_row[0] > accumulator[0]:                accumulator[1] = accumulator[0]                accumulator[0] = input_row[0]            # 新的输入值次大            elif accumulator[1] is None or input_row[0] > accumulator[1]:                accumulator[1] = input_row[0]
        def emit_value(self, accumulator):         yield Row(accumulator[0])        if accumulator[1] is not None:            yield Row(accumulator[1])
    top2 = udtaf(f=Top2(),             result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]),             accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
    
    
    

    说明:

    • Python UDTAF 功能是 Flink 1.13 之后支持的新功能;

    • create_accumulator,accumulate 和 emit_value 这 3 个方法必须定义,此外 TableAggregateFunction 中支持 retract、merge 等方法,可以根据需要选择是否定义,详细信息可以参见 Flink 官方文档[2]。

    定义完 Python UDTAF 之后,可以在 Python Table API 中这样使用:

    env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)
    t = t_env.from_elements([(1, 'Hi', 'Hello'),                         (3, 'Hi', 'hi'),                         (5, 'Hi2', 'hi'),                         (2, 'Hi', 'Hello'),                         (7, 'Hi', 'Hello')],                        ['a', 'b', 'c'])
    t_env.execute_sql("""       CREATE TABLE my_sink (         word VARCHAR,         `sum` BIGINT       ) WITH (         'connector' = 'print'       )    """)
    result = t.group_by(t.b).flat_aggregate(top2).select("b, a").execute_insert("my_sink")
    # 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出# 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法result.wait()
    

    当执行以上程序,可以看到类似如下输出:

    11> +I[Hi, 7]10> +I[Hi2, 5]11> +I[Hi, 3]
    
    
    

    说明:

    • Python UDTAF 只能用于 Table API,不能用于 SQL 语句中;

    • flat_aggregate 的结果包含了原始的 grouping 列以及 UDTAF(top 2)的输出,因此,可以在 select 中访问列 “ b ”。

    二、Python 自定义函数进阶


    1. 在纯 SQL 作业中使用 Python 自定义函数

    Flink SQL 中的 CREATE FUNCTION 语句支持注册 Python 自定义函数,因此用户除了可以在 PyFlink Table API 作业中使用 Python 自定义函数之外,还可以在纯 SQL 作业中使用 Python 自定义函数。

    CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON
    CREATE TABLE source (  a VARCHAR) WITH (  'connector' = 'datagen');
    CREATE TABLE sink (  a VARCHAR) WITH (  'connector' = 'print');
    INSERT INTO sinkSELECT sub_string(a, 1, 3)FROM source;
    


    2. 在 Java 作业中使用 Python 自定义函数

    用户可以通过 DDL 的方式注册 Python 自定义函数,这意味着,用户也可以在 Java Table API 作业中使用 Python 自定义函数,比如:

    TableEnvironment tEnv = TableEnvironment.create(            EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());tEnv.executeSql("CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON");tEnv.createTemporaryView("source", tEnv.fromValues("hello", "world", "flink").as("a"));tEnv.executeSql("SELECT sub_string(a) FROM source").collect();
    
    
    

    详细示例可以参见 PyFlink Playground [3]。

    该功能的一个重要用处是将 Java 算子与 Python 算子混用。用户可以使用 Java 语言来开发绝大部分的作业逻辑,当作业逻辑中的某些部分必须使用 Python 语言来编写时,可以通过如上方式来调用使用 Python 语言编写的自定义函数。

    如果是 DataStream 作业,可以先将 DataStream 转换成 Table,然后再通过上述方式,调用 Python 语言编写的自定义函数。

    3. 依赖管理

    在 Python 自定义函数中访问第三方 Python 库是非常常见的需求,另外,在机器学习预测场景中,用户也可能需要在 Python 自定义函数中加载一个机器学习模型。当我们通过 local 模式执行 PyFlink 作业时,可以将第三方 Python 库安装在本地 Python 环境中,或者将机器学习模型下载到本地;然而当我们将 PyFlink 作业提交到远程执行的时候,这也可能会出现一些问题:

    • 第三方 Python 库如何被 Python 自定义函数访问。不同的作业,对于 Python 库的版本要求是不一样的,将第三方 Python 库预安装到集群的 Python 环境中,只适用于安装一些公共的依赖,不能解决不同作业对于 Python 依赖个性化的需求;

    • 机器学习模型或者数据文件,如何分发到集群节点,并最终被 Python 自定义函数访问。

    除此之外,依赖可能还包括 JAR 包等,PyFlink 中针对各种依赖提供了多种解决方案:

    依赖

    类型

    解决方案

    用途描述

    示例(flink run)

    flink run参数

    配置项

    API

    作业入口文件

    -py / --python

    指定作业的入口文件,只能是.py文件

    -py file:///path/to/table_api_demo.py

    作业入口entry module

    -pym / --pyModule

    指定作业的entry module,功能和

    --python类似,可用于当作业的Python文件为zip包等情况,无法通过

    --python指定的时候,相比

    --python来说,更通用

    -pym table_api_demo-pyfs file:///path/to/table_api_demo.py

    Python三方库文件

    -pyfs / --pyFiles

    python.files

    add_python_file

    指定一个到多个Python文件(.py/.zip/.whl等,逗号分割),这些Python文件在作业执行时,会放到Python进程的PYTHONPATH中,可以在Python自定义函数中直接访问

    -pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip

    存档文件

    -pyarch /--pyArchives

    python.archives

    add_python_archive

    指定一个到多个存档文件(逗号分割),这些存档文件,在作业执行的时候,会被解压,并放到Python进程的工作目录,可以通过相对路径的方式进行访问

    -pyarchfile:///path/to/venv.zip

    Python解释器路径

    -pyexec / --pyExecutable

    python.executable

    set_python_executable

    指定作业执行时,所使用的Python解释器路径

    -pyarchfile:///path/to/venv.zip-pyexec venv.zip/venv/bin/python3

    requirements文件

    -pyreq / --pyRequirements

    python.requirements

    set_python_requirements

    指定requirements文件,requirements文件中定义了作业的Python三方库依赖,作业执行时,会根据requirements的内容,通过pip安装相关依赖

    -pyreq requirements.txt

    JAR包

    pipeline.classpaths,pipeline.jars

    没有专门的API,可以通过configuration的set_string方法设置

    指定作业依赖的JAR包,通常用于指定connector JAR包

    说明:

    • 需要注意的是,Python UDF 的实现所在的文件,也需要在作业执行的时候,作为依赖文件上传;

    • 可以通过合用 “存档文件” 与 “ Python 解释器路径”,指定作业使用上传的 Python 虚拟环境来执行,比如:

    table_env.add_python_archive("/path/to/py_env.zip")
    # 指定使用py_env.zip包中带的python来执行用户自定义函数,必须通过相对路径来指定table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")
    
    • 推荐用户使用 conda 来构建 Python 虚拟环境,conda 构建的 Python 虚拟环境包含了执行 Python 所需的绝大多数底层库,可以极大地避免当本地环境与集群环境不一样时,所构建的 Python 虚拟环境在集群执行时,缺少各种底层依赖库的问题。关于如何使用 conda 构建的 Python 虚拟环境,可以参考阿里云 VVP 文档中 “使用 Python 三方包” 章节的介绍 [4]

    • 有些 Python 三方库需要安装才能使用,即并非 ”将其下载下来就可以直接放到 PYTHONPATH 中引用“,针对这种类型的 Python 三方库,有两种解决方案:

      • 将其安装在 Python 虚拟环境之中,指定作业运行使用所构建的 Python 虚拟环境;

      • 找一台与集群环境相同的机器(或 docker),安装所需的 Python 三方库,然后将安装文件打包。该方式相对于 Python 虚拟环境来说,打包文件比较小。详情可以参考阿里云 VVP 文档中 “使用自定义的 Python 虚拟环境” 章节的介绍 [5]。

    4. 调试

    PyFlink 支持用户通过远程调试的方式,来调试 Python 自定义函数,具体方法可以参见文章 “如何从 0 到 1 开发 PyFlink API 作业”  [6] 中 “远程调试” 章节的介绍。

    另外,用户还可以在 Python 自定义函数中,通过 logging 的方式,打印日志。需要注意的是,日志输出需要在 TaskManager 的日志文件中查看,而不是当前 console。具体使用方式,请参见 “如何从 0 到 1 开发 PyFlink API 作业”  [6] 中 “自定义日志” 章节的介绍。需要注意的是,当通过 local 方式运行作业的时候,TM 的日志位于 PyFlink 的安装目录,比如:

    >>> import pyflink

    ['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink']

    5. 调优

    Python 自定义函数的性能在很大程度上取决于 Python 自定义函数自身的实现,如果遇到性能问题,您首先需要想办法尽可能优化 Python 自定义函数的实现。

    除此之外,Python 自定义函数的性能也受以下参数取值的影响。

    参数

    说明

    python.fn-execution.bundle.size

    Python自定义函数的执行是异步的,在作业执行过程中,Java算子将数据异步发送给Python进程进行处理。Java算子在将数据发送给Python进程之前,会先将数据缓存起来,到达一定阈值之后,再发送给Python进程。python.fn-execution.bundle.size参数可用来控制可缓存的数据最大条数,默认值为100000。

    python.fn-execution.bundle.time

    用来控制数据的最大缓存时间。当缓存的数据条数到达python.fn-execution.bundle.size定义的阈值或缓存时间到达python.fn-execution.bundle.time定义的阈值时,会触发缓存数据的计算。默认值为1000,单位是毫秒。

    python.fn-execution.arrow.batch.size

    用来控制当使用Pandas UDF时,一个arrow batch可容纳的数据最大条数,默认值为10000。说明 python.fn-execution.arrow.batch.size参数值不能大于python.fn-execution.bundle.size参数值。

    说明:

    • checkpoint 时,会触发缓存数据的计算,因此当上述参数配置的值过大时,可能会导致checkpoint 时需要处理过多的数据,从而导致 checkpoint 时间过长,甚至会导致 checkpoint 失败。当遇到作业的 checkpoint 时间比较长的问题时,可以尝试减少上述参数的取值。

    三、常见问题


    1)Python 自定义函数的实际返回值类型与 result_type 中声明的类型不一致,该问题会导致 Java 算子在收到 Python 自定义函数的执行结果,进行反序列化时报错,错误堆栈类似:

    Caused by: java.io.EOFException    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    

    2)在 Python 自定义函数的 init 方法里实例化了一个不能被 cloudpickle 序列化的对象。

    在提交作业的时候,PyFlink 会通过 cloudpickle 序列化 Python 自定义函数,若 Python 自定义函数包含不能被 cloudpickle 序列化的对象,则会遇到类似错误:TypeError: can't pickle xxx,可以将这种变量放在 open 方法里初始化。

    3)在 Python 自定义函数的 init 方法里 load 一个非常大的数据文件。

    由于在提交作业的时候,PyFlink 会通过 cloudpickle 序列化 Python 自定义函数,若在 init 方法里 load 一个非常大的数据文件,则整个数据文件都会被序列化并作为 Python 自定义函数实现的一部分,若数据文件非常大,可能会导致作业执行失败,可以将 load 数据文件的操作放在 open 方法里执行。

    4)客户端 Python 环境与集群端 Python 环境不一致,比如 Python 版本不一致、PyFlink 版本不一致(大版本需要保持一致,比如都为 1.12.x)等。

    四、总结


    在这篇文章中,我们主要介绍了各种 Python 自定义函数的定义及使用方式,以及 Python 依赖管理、 Python 自定义函数调试及调优等方面的信息,希望可以帮助用户了解 Python 自定义函数。接下来,我们会继续推出 PyFlink 系列文章,帮助 PyFlink 用户深入了解 PyFlink 中各种功能、应用场景、最佳实践等。

    另外,阿里云实时计算生态团队长期招聘优秀大数据人才(包括实习+社招),我们的工作包括:

    • 实时机器学习:支持机器学习场景下实时特征工程和 AI 引擎配合,基于 Apache Flink 及其生态打造实时机器学习的标准,推动例如搜索、推荐、广告、风控等场景的全面实时化;

    • 大数据 + AI 一体化:包括编程语言一体化 (PyFlink 相关工作),执行引擎集成化 (TF on Flink),工作流及管理一体化(Flink AI Flow)。

    如果你对开源、大数据或者 AI 感兴趣,请发简历到:fudian.fd@alibaba-inc.com

    此外,也欢迎大家加入 “PyFlink交流群”,交流 PyFlink 相关的问题。

    引用链接:

    [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#aggregate-functions

    [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/table/udfs/python_udfs/#table-aggregate-functions

    [3] https://github.com/pyflink/playgrounds#7-python-udf-used-in-java-table-api-jobs

    [4] https://help.aliyun.com/document_detail/207351.html?spm=a2c4g.11186623.6.687.1fe76f50loCz96#title-09r-29j-9d7

    [5] https://help.aliyun.com/document_detail/207351.html?spm=a2c4g.11186623.6.687.4b18419aCuhgmq#title-r01-50c-j82

    [6] https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q

    更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~


    ▼ 关注「Flink 中文社区」,获取更多技术干货 ▼

     戳我,查看更多技术干货~

    展开全文
  • 由于InnoDB预设是Row-Level Lock,所以只有「明确」的指定主键,MySQL才会执行Row lock (只锁住被选取的资料例) ,否则MySQL将会执行Table Lock (将整个资料表单给锁住)。举个例子: 假设有个表单t,里面有id跟name二...
  • groupBy 分组后,做select查询操作,...Exception in thread "main" org.apache.flink.table.api.TableException: toAppendStream doesn't support consuming update changes which is produced by node GroupAggreg
  • mysql tmp_table_size=256m

    2021-02-02 03:32:04
    table_cache=512innodb_additional_mem_pool_size=8Minnodb_flush_log_at_trx_commit=0innodb_log_buffer_size=4Minnodb_thread_concurrency=8key_buffer_size=128Mtmp_table_size=128Mread_buffer_size=4Mread_rnd_...
  • 第三步报错ORA-00054和LOCK table无关,而是和和第二步insert没有提交有关。insert已经其他DML语句执行的时候会给相关对象加个TM锁,以防止该对象再被操作的时候(DML--数据操控语言)发生结构改变,即使DDL语句。SQL&...
  • run_threads \#14 que_eval_sql \#15 row_drop_table_for_mysql \#16 ha_innobase::delete_table \#17 ha_delete_table \#18 mysql_rm_table_no_locks \#19 mysql_rm_table \#20 mysql_execute_command \#21 mysql_...
  • ABAP: AT ... ENDAT的使用

    2021-02-24 09:22:25
    这里我们就会用到AT… ENDAT。 这里包括: new , first , last , end of . 使用前提:一般在loop中使用,要提前对内表进行排序sort,排序的主键为统计时用到的节点(比如,按销售组织统计,节点为vkorg),Loop不能...
  • 在原有的users 表和orders表上,为orders添加外键altertabletbl_orderaddforeignkeyfk_user_id(user_id)referencesmgie_users(ID)...错误10:56:45 alter table tbl_order add foreign key...
  • 对于内表修改命令,MODIFY 与 MODIFYtable的区别: MODIFYtable 1、是出现在LOOP外对内表的修改; 2、有一个前提是该内表的定义一定要有主键,如果没有,该语句不能成功。 3、排序表和散列表的主表键是只读的...
  • Itseems at first glance perfectly reasonable to simply truncate a temporarytable, then repopulate for another usage. And then to do the temporarypoplulate/truncate operations in concurrent batches ...
  • ROWS ALTER ALWAYS ANALYZE ANCILLARY AND AND_EQUAL ANTIJOIN ANY APPEND APPLY ARCHIVE ARCHIVELOG ARRAY AS ASC ASSOCIATE AT ATTRIBUTE ATTRIBUTES AUDIT AUTHENTICATED AUTHENTICATION AUTHID AUTHORIZATION ...
  • In an AWR report, one should examine theForeground Wait Class table and the Foreground Wait Events table (or Top 5 Timed Foreground Events). These tables list the class (events) and the portion ...
  • Table Tennis题目答案参考注意 题目 A table tennis club has N tables available to the public. The tables are numbered from 1 to N. For any pair of players, if there are some tables open when they arrive...
  • 可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):问题:do all browsers support html table by columns first.i know you can do:but can you build up a table by ...
  • snf /usr/share/zoneinfo/$TIME_ZONE /etc/localtime && echo '$TIME_ZONE' > /etc/timezone RUN mkdir logs ADD ./${POJO_SERVICE_NAME}.jar ./${POJO_SERVICE_NAME}.jar ADD service.sh ./service.sh ADD wait-for...
  • 等待栈如下: (gdb) bt #0 tdc_wait_for_old_version (thd=0x7ffed8094550, db=0x7ffed802e5c0 "test", table_name=0x7ffed802e5d8 "e01", wait_timeout=31536000, deadlock_weight=0) at /opt/mysql/...
  • MySQLAlter Table注意事项

    2021-01-27 11:22:59
    ========================================================================ALTER TABLE 和FLUSH TABLE导致的间接等待场景:1、会话A执行耗时较长的操作;2、会话B执行ALTER TABLE 或FLUSH TABLES等操作时,会向...
  • acquire with WAIT timeout expired ORA-06512: at "TRA_DEVICE_LIVE", line 17 ORA-04088: error during execution of trigger 'TRA_DEVICE_LIVE'" while inserting e new data into T_EVENT. I've this problem ...
  • I’ll probably have to add a process table that stores the child pidsand have to use waitpid – not immideately,but after some time haspassed – which is a problem,because the running time of the ...
  • 42) at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109) at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:95) at org.hibernate....
  • 分布式事务解决方案Seata实战-AT模式

    千次阅读 2021-01-11 20:21:08
    微服务调用者 io.seata.rm.datasource.exec.LockWaitTimeoutException: Global lock wait timeout at io.seata.rm.datasource.exec.LockRetryController.sleep(LockRetryController.java:50) ~[seata-all-0.9.0.jar...
  • A table tennis club has N tables available to the public. The tables are numbered from 1 to N. For any pair of players, if there are some tables open when they arrive, they will be assigned to the ...
  • Please wait...2018-12-27T23:10:32.887533Z 0 [Note] InnoDB: File './ibtmp1' size is now 12MB.2018-12-27T23:10:32.888429Z 0 [Note] InnoDB: 96 redo rollback segment(s) found. 96redo rollback segment(s) ...
  • 文章目录一、AT组件概念1.1 AT命令概念1.2 rtthread AT组件资源占用1.3 AT组件功能AT Server:AT Client:二、AT组件api2.1 server api2.1.1 api2.1.2 server api示例2.2 client api2.2.1 client 处理响应数据api...
  • 问题 一个未预期的错误发生在Spoon: ...at org.pentaho.di.ui.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDialog.treeDblClick(UserDefinedJavaClassDialog.java:1557) at org.pentaho.di.ui.trans.steps

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 59,164
精华内容 23,665
关键字:

attablewait