精华内容
下载资源
问答
  • Flink问题汇总

    2020-11-24 11:08:34
    1、Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job 找不到job,flink假死重启下服务

    1、Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job
    找不到job,flink假死重启下服务
    2、org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Field types of query result and registered TableSink do not match.
    字段类型不匹配,在用concat这种,要都是string格式。
    concat(CURRENT_DATE,’’,name) 会报错,改成下面的这种
    concat(
    cast(CURRENT_DATE as string),’
    ’,name
    )

    展开全文
  • flink问题整理

    千次阅读 热门讨论 2020-01-09 14:03:57
    提交的flink程序重启失败 1. 错误日志: 从图中可以看出,flink程序在自动重启时,需要寻 /tmp 下面的一些文件。但是由于linux系统的/tmp目录会被很多程序以及操作系统本身用到,所以很难避免文件的误删除操作...
    1. 提交的flink程序重启失败
      1. 错误日志:
              
              从图中可以看出,flink程序在自动重启时,需要寻找 /tmp 下面的一些文件。但是由于linux系统的/tmp目录会被很多程序以及操作系统本身用到,所以很难避免文件的误删除操作。
      2. 解决方案
              在flink的 flink-conf.yaml 文件中,有个配置项叫 io.tmp.dirs ,该配置用于决定程序运行过程中一些临时文件保存的目录。建议将该目录配置为flink专用目录。
       
    2. flink集群无法通过 stop-cluster.sh 脚本停止
      1. 错误现象
              
             
              通过脚本停止集群,发现无法在对应的机器上找到对应的flink服务。
      2. 解决方案
              在flink的安装目录下的 /bin 目录下有个 config.sh 脚本文件,里面有一项配置用来配置flink服务的pid文件目录,配置名称为: DEFAULT_ENV_PID_DIR ,默认值为 /tmp 。由于linux系统的/tmp目录会被很多程序以及操作系统本身用到,所以很难避免文件的误删除操作。出现上述日志就是因为pid文件被删除,导致flink找不到机器上的进程pid编号所致。因此我们需要修改该默认配置为一个flink专用目录。
    3. flink sql on hive
      flink版本:1.11.1
      运行了一个sql语句,查询hive表的数据,有group by,然后对很多字段取最大值,示例代码如下:
      select user_id, max(column1), max(column2), max(column3)
      from test.test
      group by user_id;

      select后面的max字段有150多个,然后运行代码报错如下:
       

      java.lang.RuntimeException: Could not instantiate generated class 'LocalSortAggregateWithKeys$1975'
          at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
          at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
          at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:470)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
          at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
          at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
          at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
          at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
          ... 14 more
      Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
          at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
          at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
          at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
          at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
          ... 16 more
      Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
          at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
          at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
          at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
          at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
          at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
          at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
          at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
          ... 19 more
      Caused by: java.lang.StackOverflowError
          at org.codehaus.janino.CodeContext.extract16BitValue(CodeContext.java:700)
          at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:478)
          at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:557)
          at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:557)
      

      主要错误是:StackOverflowError,栈溢出。
      这个错误意思是说,方法嵌套太多。
      之后试了一下,发现是一个sql里面的聚合函数不能写太多,最大大概是120左右。建议聚合函数个数超过100个,就写两个sql,然后再把两个sql的结果进行合并。

    4. 调用太多次get_json_value函数

      将处理完的数据写入pulsar主题,主题字段特别多,超过200个,通过调用get_json_value(类似于hive中的get_json_object)函数,将处理完的结果数据(为json字符串)的内容提取出来,然后发现处理速度很慢。

      解决方式就是,结果就保存一个字段,将json字符串输入,下游处理时再通过get_json_value函数获取需要的字段值,或者是在自定义UDF中获取。

    5.  

    展开全文
  • Flink 问题总结

    千次阅读 2019-12-20 17:47:19
    1. Memory manager has been shut down.(org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.) 现象:在执行过程中,一开始流处理都可以执行,但是几秒后就停住了,经过debug和日志分析,...

    1.local运行报错

    1. Memory manager has been shut down.(org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.)

    现象:在执行过程中,一开始流处理都可以执行,但是几秒后就停住了,经过debug和日志分析,Mini Cluster被停了。
    但是相同的代码、依赖同学又可以执行,所以是环境问题,但具体又不知道是哪儿。

    原因:jdk版本问题,同样是jdk1.8的小版本之间的差异也存在,我的是1.8.0_25
    更换1.8.0_162解决!!!

    没有导日志包时报错:

    [Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1)] ERROR org.apache.flink.runtime.taskmanager.Task - FATAL - exception in resource cleanup of task Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2).
    java.lang.IllegalStateException: Memory manager has been shut down.
    	at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:821)
    	at java.lang.Thread.run(Thread.java:745)
    [Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1)] ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - FATAL - exception in resource cleanup of task Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2).
    java.lang.IllegalStateException: Memory manager has been shut down.
    	at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:821)
    	at java.lang.Thread.run(Thread.java:745)
    
    

    开启info报错日志

    [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - Shutting down Flink Mini Cluster
    [main] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shutting down rest endpoint.
    [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopping TaskExecutor akka://flink/user/taskmanager_0.
    [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.
    [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2) switched from RUNNING to FAILED.
    org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    	at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
    	at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
    	at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
    	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
    	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
    	at akka.actor.ActorCell.terminate(ActorCell.scala:374)
    	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
    	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
    	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job WindowTest (018d08d18bbbca742a9eeabcb2dcdd07) switched from state RUNNING to FAILING.
    org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    	at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
    	at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
    	at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
    	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
    	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
    	at akka.actor.ActorCell.terminate(ActorCell.scala:374)
    	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
    	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
    	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory C:\Users\zhoujie\AppData\Local\Temp\flink-io-d025a7f2-95de-47e8-b13a-cf8abbe67ccf
    [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.io.network.NetworkEnvironment - Shutting down the network environment and its components.
    [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Window(SlidingEventTimeWindows(10000, 5000), EventTimeTrigger, ScalaReduceFunction, PassThroughWindowFunction) -> Map -> Sink: Unnamed (1/1) (3863b23bd83cd0e8fd6b38f7ed543d5c) switched from RUNNING to CANCELING.
    [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Window(SlidingEventTimeWindows(10000, 5000), EventTimeTrigger, ScalaReduceFunction, PassThroughWindowFunction) -> Map -> Sink: Unnamed (1/1) (3863b23bd83cd0e8fd6b38f7ed543d5c) switched from CANCELING to CANCELED.
    [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job WindowTest (018d08d18bbbca742a9eeabcb2dcdd07) if no longer possible.
    [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job WindowTest (018d08d18bbbca742a9eeabcb2dcdd07) switched from state FAILING to FAILED.
    org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    	at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
    	at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
    	at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
    	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
    	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
    	at akka.actor.ActorCell.terminate(ActorCell.scala:374)
    	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
    	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
    	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    [ForkJoinPool.commonPool-worker-9] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Removing cache directory C:\Users\zhoujie\AppData\Local\Temp\flink-web-ui
    [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Could not restart the job WindowTest (018d08d18bbbca742a9eeabcb2dcdd07) because the restart strategy prevented it.
    org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    	at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
    	at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
    	at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
    	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
    	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
    	at akka.actor.ActorCell.terminate(ActorCell.scala:374)
    	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
    	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
    	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    [ForkJoinPool.commonPool-worker-9] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shut down complete.
    [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job 018d08d18bbbca742a9eeabcb2dcdd07.
    [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down
    [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Closing the SlotManager.
    [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Suspending the SlotManager.
    [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader service.
    [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection a60ed1ebdf3f91a90bb6f837a2956c32: ResourceManager leader changed to new address null.
    [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor akka://flink/user/taskmanager_0.
    [Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2) switched from RUNNING to FAILED.
    java.lang.RuntimeException: Buffer pool is destroyed.
    	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
    	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
    	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
    	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
    	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
    	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
    	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    	at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
    	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:244)
    	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
    	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:236)
    	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:229)
    	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:149)
    	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
    	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:101)
    	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
    	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
    	... 34 more
    [Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2).
    [Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1)] ERROR org.apache.flink.runtime.taskmanager.Task - FATAL - exception in resource cleanup of task Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2).
    java.lang.IllegalStateException: Memory manager has been shut down.
    	at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:821)
    	at java.lang.Thread.run(Thread.java:745)
    [Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1)] ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - FATAL - exception in resource cleanup of task Source: Custom Source -> Map -> Timestamps/Watermarks -> Map (1/1) (5794d243fa9118faec953fa5ef3499c2).
    java.lang.IllegalStateException: Memory manager has been shut down.
    	at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:821)
    	at java.lang.Thread.run(Thread.java:745)
    
    
    展开全文
  • flink问题记录

    千次阅读 2020-06-29 16:13:59
    flink上跑一个算法的离线指标,报错如下。 Map (Map at com.wwdz.recsys.home.ItemFilterDay$.main(ItemFilterDay.scala:186)) -> Combine (GroupReduce at org.apache.flink.api.scala.GroupedDataSet.first...

    More buffers requested available than totally available

    在flink上跑一个算法的离线指标,报错如下。

    Map (Map at com.xx.home.ItemFilterDay$.main(ItemFilterDay.scala:186)) -> Combine (GroupReduce at org.apache.flink.api.scala.GroupedDataSet.first(GroupedDataSet.scala:480)) (5/8) (96228117fa07e01079d3cba5f0dc742b) switched from RUNNING to FAILED.
    java.lang.IllegalArgumentException: More buffers requested available than totally available.
    	at org.apache.flink.runtime.operators.hash.MutableHashTable.ensureNumBuffersReturned(MutableHashTable.java:1333)
    	at org.apache.flink.runtime.operators.hash.MutableHashTable.createPartitions(MutableHashTable.java:1122)
    	at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:918)
    	at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
    	at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
    	at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:116)
    	at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
    	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
    	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    	at java.lang.Thread.run(Thread.java:748)
    

    flink启动命令为 :

    flink run -m yarn-cluster -yqu batch -ynm algo-job -d -ys 8 -ytm 27648 -yD containerized.heap-cutoff-ratio=0.1 -yD taskmanager.memory.off-heap=true -yD taskmanager.memory.size=200m -c com.recsys.home.ShopFilterOneday /home/flink/submitjar/other/batch/flink-algo/0.2/flink-algo-0.0.1.jar
    

    刚开始以为是内存分配不足,内存翻3倍以后发现还是这个错误

    查看flink源码,可以得志原因是join的时候必须保证有一定的free segments,将TM的buffer内存恢复为默认值,即使用最初的原始命令,好使:

    flink run -m yarn-cluster -yqu batch -ynm algo-job -d -ys 8 -ytm 27648 -c com.recsys.home.ShopFilterOneday /home/flink/submitjar/other/batch/flink-algo/0.2/flink-algo-0.0.1.jar
    
    /**
    	 * This method makes sure that at least a certain number of memory segments is in the list of free segments.
    	 * Free memory can be in the list of free segments, or in the return-queue where segments used to write behind are
    	 * put. The number of segments that are in that return-queue, but are actually reclaimable is tracked. This method
    	 * makes sure at least a certain number of buffers is reclaimed.
    	 *  
    	 * @param minRequiredAvailable The minimum number of buffers that needs to be reclaimed.
    	 */
    	final void ensureNumBuffersReturned(final int minRequiredAvailable) {
    		if (minRequiredAvailable > this.availableMemory.size() + this.writeBehindBuffersAvailable) {
    			throw new IllegalArgumentException("More buffers requested available than totally available.");
    		}
    		
    		try {
    			while (this.availableMemory.size() < minRequiredAvailable) {
    				this.availableMemory.add(this.writeBehindBuffers.take());
    				this.writeBehindBuffersAvailable--;
    			}
    		}
    		catch (InterruptedException iex) {
    			throw new RuntimeException("Hash Join was interrupted.");
    		}
    	}
    

    java.lang.String cannot be cast to java.math.BigDecimal

    Flink作业开发中,出现错误“java.lang.String cannot be cast to java.math.BigDecimal”,第一反应是table与stream之间的转换存在问题,排除后最后将目光转移到他处,终于发现是在sql中出现了这个
    在这里插入图片描述

    No key set. This method should not be called outside of a keyed context.

    flink版本1.10.1 时间2020-07-14 21:51:31, 问题出现的原因是出在keyby那里,keyby的逻辑是

    (a.getUserId() + "_" + created +"_"+ a.getItemId()).hashCode() % 6
    

    当我用这个去生成key的时候,会出现使用state去get或者put报错NEP,State检查时报key不存在,但是我debug时key是存在的,开始还怀疑是key不能为负数,后来把key生成的逻辑改简单点,如

    a.getUserId().hashCode() % 6
    

    就OK了。网上一般说是在Open中调用才会导致这错误,但显然我不是~暂时不纠结了,晚点再去debug源码看下

    java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
    	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
    	at org.apache.flink.runtime.state.heap.StateTable.checkKeyNamespacePreconditions(StateTable.java:256)
    	at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:236)
    	at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
    	at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
    	at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
    	at com.xxx.bigdata.flink.streaming.operate.ItemGroupCountKeyedProcessFunction.processElement(ItemGroupCountKeyedProcessFunction.java:51)
    	at com.xxx.bigdata.flink.streaming.operate.ItemGroupCountKeyedProcessFunction.processElement(ItemGroupCountKeyedProcessFunction.java:22)
    	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
    	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
    	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
    	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    	at java.lang.Thread.run(Thread.java:748)
    

    这个错误是因为给算法同学的flink批处理做优化,因为算法同学的数据量很大,operator也比较多,想着调整下parallel提高性能,作业在线上是正常运行的,最开始的两个operator是source和map,并行度都是8。但是当我把source的并行度调整为2时,报错如下,原因大概是由于并行度改变,operator之间数据需要序列化带来的问题,属于代码隐藏的bug吧,让算法同学去修正~

    java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    	at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:32)
    	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:161)
    	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:44)
    	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
    	at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)
    	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:113)
    	at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
    	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
    	at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
    	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    	at java.lang.Thread.run(Thread.java:748)
    2020-07-13 20:11:59.466 [DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.createInput(ExecutionEnvironment.scala:360) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (2/2)] INFO  com.aliyun.odps.jdbc.OdpsConnection  ------ connection closed
    2020-07-13 20:11:59.467 [DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.createInput(ExecutionEnvironment.scala:360) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (2/2)] INFO  org.apache.flink.runtime.taskmanager.Task  ------ DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.createInput(ExecutionEnvironment.scala:360) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (2/2) (e5d839982a4d1a29c0115160e0abdcfa) switched from RUNNING to FAILED.
    java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    	at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:32)
    	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:161)
    	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:44)
    	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
    	at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)
    	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:113)
    	at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
    	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
    	at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
    	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    	at java.lang.Thread.run(Thread.java:748)
    

    The new state serializer cannot be incompatible

    这个问题在flink中算是比较常见的了,因为支持从点位恢复,所以咱们就想着尽可能得使用这个特性,代码先上线跑着,反正要改的话可以从保存点恢复,导致滥用,事实上,当代码一旦上线以后,大体的逻辑就已经定了,不能随便修改、迁移。
    目前遇到这几种情况是会报如下错的,也是官网上都说明了的:
    1 flink1.9.0迁移到1.10.1时,flink sql是不保证State的完全兼容的,尤其是在使用了group by函数时,迁移必定报错
    2 flink1.10.1中,如果一个作业刚开始没有使用StateTtlConfig来对State做过期,后面想加上时发现会报错
    3 keyby 的逻辑变更了

    2020-07-10 08:59:01
    java.lang.RuntimeException: Error while getting state
    	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
    	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
    	at com.xx.bigdata.flink.streaming.function.ComputeScoreMapFuncion.open(ComputeScoreMapFuncion.java:45)
    	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
    	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
    	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
    	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
    	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
    	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
    	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
    	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
    	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
    	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
    	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
    	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
    	... 12 more
    

    参考资料:

    https://www.jianshu.com/p/f3d4867b1c81 (remote debug)

    https://blog.csdn.net/u013036495/article/details/85544050

    https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters?src=contextnavpagetreemode (***)

    https://issues.apache.org/jira/browse/FLINK-18452?jql=text~%22StateMigrationException%3A%20The%20new%20state%20serializer%20cannot%20be%20incompatible%22 (same question)

    https://blog.csdn.net/nazeniwaresakini/article/details/105044149 (blog)

    https://cloud.tencent.com/developer/article/1653407 (***)

    flink mysql维表 Communications link failure Connection reset

    当flink的数据很稀疏,比如退款等小概率事件的实时流中使用mysql作为维表时,由于一直没有数据进行join,mysql空闲检测机制就直接关闭该连接了,导致flink程序异常,甚至直接failed退出。
    三个方法:

    1. 增加mysql的空闲时间即wait_timeout,但是这会导致保持的连接过多,不可取,一般默认8小时,生产环境配置24小时
    2. 增加心跳机制,idleConnectionTestPeriod,告诉mysql我还活着,但是在flink
      sql中暂时还不知道怎么配置,其他前辈是用C3P0连接池的
    3. 为了保持活跃,定时往source中扔冗余数据,在代码中过滤,即是为了保持consumer的活跃也为了维表,当然,这有点本末倒置了,实在没有方法时可以试试绕过~
    com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
    The last packet successfully received from the server was 983,155 milliseconds ago.  The last packet sent successfully to the server was 983,155 milliseconds ago.
    Caused by: java.net.SocketException: Connection reset
    

    Flink1.10.1 使用flinksql groupby sum 统计订单金额延迟写入redis问题

    今早被吓出一身冷汗。早上来到公司,想着在自己公司的电商APP上买点东西,随便验证下自己的一些实时指标,比如订单、支付,但是支付后去查自己的账号信息却被吓到了,一个多月前上线的用户支付金额统计的画像指标竟然数据对不上,我早上支付前我的当前时刻支付总金额是360元,但是我付了100元后,竟然过了半小时数据还没刷新,不对呀,我的天,我写的是flink实时指标啊,而且诡异的是类似的处理逻辑的支付总次数是更新正确的,在各种焦急排查处理后还是不对,然后验证了一些其他账号都没类似的问题,只有我的账号出现该问题,但是我知道,问题不是可以忽略的啊!于是,我又支付了一笔,这次数据竟然对了,还带上了之前支付的金额,也就是说flink sql的state应该是对的,但是结果延迟输出了~这可不行啊,晚上跟着源码看看
    ~~ ~~~~ ~~
    几天后我又遇到类似的问题了,这次的更明显,我计算的指标是订单总数ALL_CNT、A人群订单数A_CNT和B人群订单数B_CNT,在代码中的实现是Flink SQL group count统计A、B人群的订单数,然后ALL_CNT = A_CNT + B_CNT,最后将三个指标都输入redis当作画像指标,照理来说,redis中的数据应该始终满足ALL_CNT = A_CNT + B_CNT,就算有偏差也是肉眼很难观察到的,但是在我数据校验时,我取出20万用户的订单数进行验证,发现有20条数据是有问题的,即A_CNT加上B_CNT不等于ALL_CNT,而且数据错误的表现很有意思,有时候ALL_CNT是对的,A_CNT或者B_CNT是错误的,有时候又是ALL_CNT是错的,A_CNT或者B_CNT是对的。
    ALL_CNT有时候是对的这使得我判断flink中State状态是正确的,只是某条结果没有输出,或者是输出的顺序出现错误。于是我将出错的用户数据回刷复现,确实可以偶尔复现该问题,日志显示非常明显,是乱序导致的
    在这里插入图片描述

    问题定位到一切就简单了,扫了一眼作业flink的ExecutionGraph,发现process算子的并行度是6,而redis sink算子是3,将sink算子改为6后,一切正常了!油卡他 ! 一想,一切都解释得通了~ 以后不能乱设置并行度了,特别是sink端,尽量与之前算子保持一致

    com.mysql.cj.exceptions.StatementIsClosedException: No operations allowed after statement closed

    这个问题不少见,遇到最多的就是flink作业中使用mysql作为维表时由于8小时问题使用jdbc connect关闭导致的,但是我们这次更恐怖,由于BI某库的配置升级,导致数据库服务有一分钟的不可用,导致所有flink中依赖这个BI库的作业都出现了问题,无论是作为维表或是sink,但是幸运的是flink有自己的重启策略,yarn也会将挂掉的TM给自动拉起来,但是前提是不对标题中的异常try-catch,之前我做数据处理的时候都是catch最大的Exception,现在看来还是不能偷懒,必须精确地指定catch哪些异常,而哪些异常又应该往上抛

    Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

    一般第一反应是去检查yarn queue的资源是否满了,但是当只有某个作业出现不能提交到yarn的时候我们就应该很清楚,肯定是这个作业本身出问题了,通过yarn logs -applicationId查看日志后你会发现真正的异常信息,比如包冲突之类的,但是在客户端上,你看到的错误就是Please check if the requested resources are available in the YARN cluster~~ 不要被误导了!

    Flink维表数据无法读取

    数据无法读取也就算了,坑的是我在idea里面调试的时候控制台上什么错误也不显示,而是将错误日志写到了日志文件里面去了,只要有错误日志,问题倒是好解决的。报错如下,数字类型字段不能加上unsigned. flink1.11修复了

    2020-08-06 14:08:28.596 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  ------ Source: JDBCTableSource(userid) -> SourceConversion(table=[default_catalog.default_database.shop, source: [JDBCTableSource(userid)]], fields=[userid]) -> Calc(select=[userid AS userId, userid AS cnt]) -> SinkConversionToTuple2 -> Flat Map -> Sink: Print to Std. Out (4/4) (00ebbc1b13aa96494bac99b0e6f14801) switched from RUNNING to FAILED.
    java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Long
    	at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
    	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
    	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
    	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
    

    Flink Table转DataStream时报错:ValidationException: Field types of query result and registered TableSink do not match

    Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink  do not match.
    Query schema: [userId: BIGINT, itemId: BIGINT, robotUser: BIGINT, riskLevel: INT]
    Sink schema: [f0: RAW('com.wwdz.bigdata.flink.streaming.entity.ResultEntity', ?)]
    	at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
    	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
    	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
    	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
    

    原因是代码中定义的Class不能有@Builder

    tableEnv.toAppendStream(resultTable, ResultEntity.class);

    flink1.10.1 使用Phoenix jdbc作为Sink时,报错: java.lang.NoClassDefFoundError: org/apache/flink/calcite/shaded/com/google/protobuf/MessageOrBuilder

    完整错误:
    2020-08-26 16:36:13,726 INFO org.apache.flink.runtime.taskmanager.Task - sum-im -> (filter-bycount -> Filter -> Sink: sink-phoneix, Filter) (1/2) (027bba9c912d22903f3698db52b99957) switched from RUNNING to FAILED.
    java.lang.NoClassDefFoundError: org/apache/flink/calcite/shaded/com/google/protobuf/MessageOrBuilder
    at org.apache.calcite.avatica.remote.HADriver.createService(HADriver.java:72)
    at org.apache.calcite.avatica.remote.Driver.createMeta(Driver.java:97)
    at org.apache.calcite.avatica.AvaticaConnection.(AvaticaConnection.java:121)
    at org.apache.calcite.avatica.AvaticaJdbc41Factory A v a t i c a J d b c 41 C o n n e c t i o n . < i n i t > ( A v a t i c a J d b c 41 F a c t o r y . j a v a : 109 ) a t o r g . a p a c h e . c a l c i t e . a v a t i c a . A v a t i c a J d b c 41 F a c t o r y . n e w C o n n e c t i o n ( A v a t i c a J d b c 41 F a c t o r y . j a v a : 65 ) a t o r g . a p a c h e . c a l c i t e . a v a t i c a . U n r e g i s t e r e d D r i v e r . c o n n e c t ( U n r e g i s t e r e d D r i v e r . j a v a : 138 ) a t o r g . a p a c h e . c a l c i t e . a v a t i c a . r e m o t e . D r i v e r . c o n n e c t ( D r i v e r . j a v a : 165 ) a t o r g . a p a c h e . p h o e n i x . q u e r y s e r v e r . c l i e n t . D r i v e r . c o n n e c t ( D r i v e r . j a v a : 61 ) a t j a v a . s q l . D r i v e r M a n a g e r . g e t C o n n e c t i o n ( D r i v e r M a n a g e r . j a v a : 664 ) a t j a v a . s q l . D r i v e r M a n a g e r . g e t C o n n e c t i o n ( D r i v e r M a n a g e r . j a v a : 270 ) a t c o m . x x x . b i g d a t a . f l i n k . s t r e a m i n g . r i s k . i m . m o n i t o r . s i n k . U s e r I m M o n i t o r C o u n t P h o e n i x S i n k . o p e n ( U s e r I m M o n i t o r C o u n t P h o e n i x S i n k . j a v a : 28 ) a t o r g . a p a c h e . f l i n k . a p i . c o m m o n . f u n c t i o n s . u t i l . F u n c t i o n U t i l s . o p e n F u n c t i o n ( F u n c t i o n U t i l s . j a v a : 36 ) a t o r g . a p a c h e . f l i n k . s t r e a m i n g . a p i . o p e r a t o r s . A b s t r a c t U d f S t r e a m O p e r a t o r . o p e n ( A b s t r a c t U d f S t r e a m O p e r a t o r . j a v a : 102 ) a t o r g . a p a c h e . f l i n k . s t r e a m i n g . a p i . o p e r a t o r s . S t r e a m S i n k . o p e n ( S t r e a m S i n k . j a v a : 48 ) a t o r g . a p a c h e . f l i n k . s t r e a m i n g . r u n t i m e . t a s k s . S t r e a m T a s k . i n i t i a l i z e S t a t e A n d O p e n ( S t r e a m T a s k . j a v a : 990 ) a t o r g . a p a c h e . f l i n k . s t r e a m i n g . r u n t i m e . t a s k s . S t r e a m T a s k . l a m b d a AvaticaJdbc41Connection.<init>(AvaticaJdbc41Factory.java:109) at org.apache.calcite.avatica.AvaticaJdbc41Factory.newConnection(AvaticaJdbc41Factory.java:65) at org.apache.calcite.avatica.UnregisteredDriver.connect(UnregisteredDriver.java:138) at org.apache.calcite.avatica.remote.Driver.connect(Driver.java:165) at org.apache.phoenix.queryserver.client.Driver.connect(Driver.java:61) at java.sql.DriverManager.getConnection(DriverManager.java:664) at java.sql.DriverManager.getConnection(DriverManager.java:270) at com.xxx.bigdata.flink.streaming.risk.im.monitor.sink.UserImMonitorCountPhoenixSink.open(UserImMonitorCountPhoenixSink.java:28) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda AvaticaJdbc41Connection.<init>(AvaticaJdbc41Factory.java:109)atorg.apache.calcite.avatica.AvaticaJdbc41Factory.newConnection(AvaticaJdbc41Factory.java:65)atorg.apache.calcite.avatica.UnregisteredDriver.connect(UnregisteredDriver.java:138)atorg.apache.calcite.avatica.remote.Driver.connect(Driver.java:165)atorg.apache.phoenix.queryserver.client.Driver.connect(Driver.java:61)atjava.sql.DriverManager.getConnection(DriverManager.java:664)atjava.sql.DriverManager.getConnection(DriverManager.java:270)atcom.xxx.bigdata.flink.streaming.risk.im.monitor.sink.UserImMonitorCountPhoenixSink.open(UserImMonitorCountPhoenixSink.java:28)atorg.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)atorg.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)atorg.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)atorg.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)atorg.apache.flink.streaming.runtime.tasks.StreamTask.lambdabeforeInvoke 0 ( S t r e a m T a s k . j a v a : 453 ) a t o r g . a p a c h e . f l i n k . s t r e a m i n g . r u n t i m e . t a s k s . S t r e a m T a s k A c t i o n E x e c u t o r 0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor 0(StreamTask.java:453)atorg.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutorSynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.calcite.shaded.com.google.protobuf.MessageOrBuilder
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    … 22 more

    已经有小伙伴临时解决这个问题了,FLINK-15463
    给力
    在这里插入图片描述

    **

    Could not initialize class org.apache.flink.runtime.entrypoint.parser.CommandLineOptions

    **
    少包

    commons-cli
    commons-cli
    1.3.1

    Flink 作业提交客户端后报错: Could not build the program from JAR file.
    一般来说,首先考虑的是否是Jar和MainClass的路径是否不对,最根本的当然还是去看日志,一般在${FLIN_HOME}/log下的客户端日志,如下图日志显示出具体原因是命令少了一个 ‘-yD’, 导致配置 TaskManage 内存的参数被解析为 JAR file。

    org.apache.flink.client.cli.CliArgsException: Could not build the program from JAR file.
    	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:200)
    	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
    	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at javax.security.auth.Subject.doAs(Subject.java:422)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
    	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
    Caused by: java.io.FileNotFoundException: JAR file does not exist: taskmanager.memory.size=200m
    	at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:717)
    	at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:693)
    	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:197)
    	... 7 more
    

    我是分割线

    Caused by: org.apache.flink.api.common.InvalidProgramException: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn’t call an eager execution function [collect, print, printToErr, count].

    原因是因为 detached 模式下是不能使用 collect, print, printToErr, count 等这种 eager function
    非detached 模式是指客户端必须等待程序结果返回而不能中断的模式,比如咱们直接在 Idea 里面启动、或者直接在 Linux 上命令启动(不加-d)的时候,必须等待程序执行完成才能关闭窗口,也不能按 ctrl + c。
    但是 detached 模式是客户端把程序提交到 Yarn 上就直接退出了,即看不到程序运行的结果也不能直接kill。
    eager function 我理解是必须将结果展示在 console 了这种窗口上,没有落地的,比如写到文件系统里,也就必须和非 detached 模式配套一起使用了~

    我是分割线
    因为用 flink 做实时数据湖,涉及用 flink Dateset 去做 小文件的 Compact,Dataset 和 DataStream 的 API 相差好多啊,Source 目前社区做了一些工作还好, 中间的 transform 和 sink 完全是两套。 记录下 Stream 中经典的 keyby + keyedProcessFunction 在 Batch 中变成了 groupBy + reduceGroup + GroupReduceFunction

    展开全文
  • 用prometheus+pushgateway+grafana监控体系对flink的各项metrics进行监控,出现如下图所示状况: 可以发现,指标每过一些时间就会消失,然后又出现。 2. 问题分析 2.1 实验设计 存在三种可能,1:prometheus有bug...
  • Flink 问题记录

    千次阅读 2018-10-31 10:22:11
    1.集群突然宕机 找到Master节点的日志 vi 打开 Shift + g ... 进入hdfs hadoop fs -ls /flink-checkpoints | grep 任务ID 找到id对应的checkPoint目录 进入目录 获取/flink-checkpoints/a1cb4cadb79c74ac8d3c7a11b6...
  • 二十三、Apache Flink常见问题汇总

    千次阅读 2019-06-12 10:58:43
    本篇主要记录一些Flink生产或者是原理方面的问题 问题汇总(不定时更新) 1.flink如何处理背压问题? 2.flink sql 转换为DataStream或DataSet任务的原理是什么?详细过程描述 3.flink checkpoint 原理是怎么实现的? ...
  • Flink问题排查-Buffer pool is destroyed.

    千次阅读 2020-06-05 18:18:04
    1、jdk问题:发送数据源方jdk版本高于Flink使用方,需要提高jdk版本解决 2、数据源问题(某个数据源为空),由于这个报错往往与"Could not forward element to next operator"同时存在,很可能会考虑到数据源问题上去...
  • Pulsar Pulsar-Flink-Connector Pulsar-Flink-Connector: java.lang.NoSuchFieldError: INIT_AUTH_DATA
  • Flink-ClickHouse-Sink 描述 用于数据库的器。 由。 用于将数据加载到ClickHouse的高性能库。 它有两个触发器来加载数据:超时和缓冲区大小。 版本图 闪烁 flink-clickhouse-sink 1.3。* 1.0.0 1.9。* 1.3.1 ...
  • IDEA越来越流行,同时Flink企业已经开始普及,也收到大家的关注,后面我们要开始补充一些Flink的知识。为了后面打下基础,我们你这里交给大家如何使用IDEA。IDEA最重要的是导入项目,然后运行。这是我们学习Flink的...
  •  Flink 架构体系的一大特性是:有状态计算。 有状态计算:任务执行过程中,会存储计算过程中产生的中间结果,并提供后续的 Function 或 算子计算结果使用 状态:任务内部数据(计算数据和元数据属性)的快照。在...
  • 脉冲星Flink连接器 Pulsar Flink连接器使用和实现弹性数据处理。 有关中文文档的详细信息,请参见。 先决条件 Java 8或更高版本 Flink 1.9.0或更高版本 Pulsar 2.4.0或更高版本 基本信息 本节介绍有关Pulsar Flink...
  • Flink运行jar包时报错

    千次阅读 2020-02-26 13:01:15
    Flink运行jar包是报错 查看Flinkweb 日志 造成的原因是 jar里面的flink版本和虚拟机里面的版本不一致造成,把代码里面的Flink版本改成与Flink一直就可以了执行 查看flink版本的命令 flink -v 因为我的flink版本是1.7...
  • Flink之SQL

    2021-01-20 12:51:34
    def main(args: Array[String]): Unit = { //sparkcontext val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //时间特性改为eventTime env....
  • 共课程包含9个章节:Flink安装部署与快速入门、Flink批处理API、Flink流处理API、Flink高级API、Flink-Table与SQL、Flink-Action综合练习、Flink-高级特性和新特性、Flink多语言开发、Flink性能调优 课程目录: ...
  • flink1.12.1 sql演示 个人练习flink的演示。 代码里自带数据生成,附带合适的数据演示。 pom.xml包含了flink绝大部分的依赖,maven下载会比较久。 目前项目内容包含了 环境 Windows 10 ltsc 的Mysql8.0.23 Idea...
  • 接上文:Flink笔记(十八):Flink 之 StateBackend 介绍 & 使用,本文介绍 Flink 从 Checkpoint 中恢复数据。 当 Flink 任务提交后,只有遇到 1.程序错误、2.人为 Cancel掉,任务就会停止。 1. 任务异常,设置保存 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 95,915
精华内容 38,366
关键字:

flink问题