精华内容
下载资源
问答
  • } } } } Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by position on GenericType<scala.Tuple2>Referencing a ...

    代码混用

    在java代码编写代码时候不知不觉就引入了,引入了scala包,

      <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
           </dependency>
           
    
    package cn.putact.datastream;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    **import scala.Tuple2;**
    
    public class NcWordCount {
        public static void main(String[] args) throws Exception{
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<Tuple2<String, Integer>> dataStream = env
                    .socketTextStream("localhost", 9999)
                    .flatMap(new Splitters())
                    .keyBy(value -> value._1)
                    .timeWindow(Time.seconds(5))
                    .sum(1);
            dataStream.print();
    
            env.execute("Window WordCount");
        }
    
        public static class Splitters implements FlatMapFunction<String,Tuple2<String,Integer>>{
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
    
                for (String word:s.split(" ")){
                    collector.collect(new Tuple2<String ,Integer>(word,1));
                }
            }
        }
    }
    
    
    
    Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by position on GenericType<scala.Tuple2>Referencing a field by position is supported on tuples, case classes, and arrays. Additionally, you can select the 0th field of a primitive/basic type (e.g. int).
    	at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:87)
    	at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:43)
    	at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:1352)
    	at cn.putact.datastream.NcWordCount.main(NcWordCount.java:21)
    

    调试了半天最后发现java代码混合了scala报错,idea编译是通过的,最好写纯java代码,mavn依赖去掉scala 包,代码中用java tuple元祖就好

    //import scala.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple2;
    
    展开全文
  • Tuple2 报错Incompatible equality constrain T1 and String **解决方式:**可能是spark_core依赖包版本问题,更换一个版本就行了。

    Tuple2 报错Incompatible equality constrain T1 and String

    问题1

    **解决方式:**可能是spark_core依赖包版本问题,更换一个版本就行了。

    展开全文
  • Master当前消息类型:Tuple2 这里第二次匹配case的时候竟然是元组类型,仔细看debug, 难怪原来使用case content: PageContent一直匹配不成功。。原来message消息类型不是纯粹的就是消息本身的类型,可能还...

    使用UntypedAbstractActor时,出现message匹配不上自定义的对象,但是debug时发现类型确实是,而且用了case class,各种方法都不行

    类结构,最终有效代码:

    abstract class Master(latch: CountDownLatch) extends UntypedAbstractActor {
    override def onReceive(message: Any) = {
    
        logger.info("master,actor is:" + self)
        logger.info("Master当前消息类型:" + message.getClass.getSimpleName)
        message match {
            //启动
            case start: String => {
                logger.info("=======================start================")
                visitedPageStore.add(start)
                getParser() ! visitedPageStore.getNext()
            }
            //页面
            case (content: PageContent, _) => {
                logger.info("========================page==============")
                getIndexer() ! content
                //存储待访问页面链接
                visitedPageStore.addAll(content.getLinksToFollow())
                if (visitedPageStore.isFinished()) {
                    //完成了则提交
                    getIndexer() ! COMMIT_MESSAGE
                } else {
                    //继续获取下一个页面
                    for (page <- visitedPageStore.getNextBatch()) {
                        getParser() ! page
                    }
                }
    
            }
            //索引
            case (indexedMessage: IndexedMessage, _) => {
                logger.info("====================index=================")
                visitedPageStore.finished(indexedMessage.getPath)
                if (visitedPageStore.isFinished())
                    getIndexer() ! COMMIT_MESSAGE
    
            }
            //提交
            case (COMMITTED_MESSAGE, _) => {
                logger.info("======================end================")
                logger.info("Shutting down, finished")
                getContext().system.terminate()
                latch.countDown()
            }
            case _ => {
                logger.info("Unknown execution steps")
            }
        }
    
    }
    }
    子类:
    class SimpleActorMaster(latch: CountDownLatch) extends Master(latch) {}
    

    打印当前message的类型输出如下:

    Master当前消息类型:String

    Master当前消息类型:Tuple2

    这里第二次匹配case的时候竟然是元组类型,仔细看debug,

    难怪原来使用case content: PageContent一直匹配不成功。。原来message消息类型不是纯粹的就是消息本身的类型,可能还携带了actor地址。。。

     

    展开全文
  • 今天在写spark 提数的时候遇到一个异常,如下Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2 at $anonfun$1$$anonfun...

    今天在写spark 提数的时候遇到一个异常,如下

    Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
      at $anonfun$1$$anonfun$apply$1.apply(<console>:27)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      at $anonfun$1.apply(<console>:27)
      at $anonfun$1.apply(<console>:27)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:745)

    df的schema如下:

    root
     |-- cityId: long (nullable = true)
     |-- countryId: long (nullable = true)
     |-- outline: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- _1: double (nullable = true)
     |    |    |-- _2: double (nullable = true)
     |-- provinceId: long (nullable = true)
     |-- townId: long (nullable = true)
    

    使用如下方法在提取outline 字段时,报错

    val outline = df.select("outline").collect().map(row => row.getAs[Seq[(Double,Double)]]("outline"))

    解决方法,我已知两种:
    第一种,在生成数据时,定义case class对象,不存储tuple,就我个人而言,不适用,数据已经生成完毕,再次生成数据需要2天的时间
    第二种方法:

     val lines = json.filter(row => !row.isNullAt(2)).select("outline").rdd.map(r => {
          val row:Seq[(Double,Double)] = r.getAs[Seq[Row]](0).map(x =>{(x.getDouble(0),x.getDouble(1))})
          row
        }).collect()(0)

    至于报错的原因,google了一下,我觉得有一种说法可信:
    在row 中的这些列取得时候,要根据类型取,简单的像String,Seq[Double] 这种类型就可以直接取出来,但是像 Seq[(Double,Double)] 这种类型直接取得花就会丢失schema信息,虽然值能取到,但是schema信息丢了,在dataFrame中操作的时候就会抛错

    展开全文
  • 在Aggregate状态获取时,为了方便起见可以将聚合函数简化为Tuple2<Integer, Long>类型,但是Tuple2<Integer, Long>的类型如何表示呢? 解决: 表示方式如下: Types.TUPLE(Types.INT, Types.LONG) ...
  • Tuple元组类型使用以及Lambda编写flink程序正确姿势(方法与避坑)
  • Java实现好用的Tuple

    千次阅读 2018-11-18 11:38:20
    public abstract class Tuple { //&lt;A&gt;:声明此方法为泛型方法,该方法持有一个类型A //这里使用Optional是为了提醒使用返回值的方法做非Null检查 public abstract &lt;A&gt; Optional&...
  • Scala 入门-元组(Tuple

    千次阅读 2020-01-07 23:53:37
    val ingredient : (String, Int) = ("Sugar" , 25) // Tuple2[String, Int] // 访问元素 println(ingredient._1) // Sugar println(ingredient._2) // 25 如何解构元组数据 在 () 中定义与元组包含元素数量...
  • Java元组Tuple

    千次阅读 2019-09-02 17:34:04
    概念2. 使用2.1 依赖Jar包2.2 基本使用2.2.1 直接调用2.2.2 自定义工具类2.2.3 示例代码 1. 概念   Java中的Tuple是一种数据结构,可存放多个元素,每个元素的数据类型可不同。Tuple与List集合类似,但是不同的是...
  • scala中的tuple元组

    千次阅读 2017-08-01 09:22:36
     二元祖是Tuple2 class的实例,三元祖是Tuple3 class的实例。下面是创建二元祖最简单的方式: scala> val a = ( "AL" ,  "Alabama" ) a: ( String ,  String ) = (AL,Alabama)  你还可以使用下面的...
  • scala中Tuple元组大全

    2020-03-03 19:21:37
    美图欣赏: 一.背景 元组在操作Spark中还是非常多的 二.元组(Tuple) 映射是K/V对偶的集合,对偶是元组的最简单形式,元组可以装着多个不同类型的值,是不同类型的值的聚集。... val tuple = (1, "Jackson" , ...
  • Python元组(tuple)详解

    2020-07-19 15:25:13
    举例介绍: 代码部分: #一般定义的形式: tuple=(1,2,3,4,5,6,'a',"您好") tuple2="1","2","3",4,5,6,'b' #查看类型以及输出元组内的内容 print(type(tuple)) print(type(tuple2)) print(tuple) print(tuple2) #空元组...
  • public class WhitelistBroadcastProcess extends BroadcastProcessFunction>, Tuple2>, Tuple2>> { @Override public void processElement(Tuple3> value, ReadOnlyContext ctx, Collector>> out) throws ...
  • Python基础—tuple元组

    2019-06-21 20:20:55
    创建元组 在这里插入代码片 元组的连接/组合 使用“+”连接,将tuple1与tuple2中的元素取出重新组合成一个新的元组并返回 元组的截取 tuple1[start
  • Python 之元组(tuple

    2020-07-28 13:21:15
    元组 一、元组的特点: 1、与列表相似,不同之处就在于元组的元素不能被修改。 2、列表使用的是中括号“[]”,元组使用的是小括号“()”。 3、列表属于可变类型,元组属于不可变类型。...tuple2 = (1,2,3,4,5,6,
  • Scala 集合之 Tuple

    千次阅读 2018-07-15 23:39:43
    println(s"外层的key:${tuple2._1} 内层的key:${tuple2._2._1} value:${tuple2._2._2}") //创建二元组的第二种方式 println("--\"key\" -> \"value\"--") var tuple3 = "key" -> 1 println(s"key:${tuple3._1} value...
  • java Tuple 元组

    万次阅读 2015-08-18 14:09:26
     Tuple2,T>可以充当两个对象的容器,该容器内保持两个对象的引用,通过方法_1(), _2()获取第一个和第二个对象的值,在此并没有直接返回对象,而是返回一个Optional对象,这是提醒使用返回值的方法做非Null检查;...
  • Python中的元组(Tuple)

    2018-11-10 13:56:53
    tuple: 元组一旦创建就不可更改 ...tuple2=(1,) tuple3=(1,2,3,4) print(tuple) print(type(tuple)) print('*'*33) print(tuple1) print(type(tuple1)) print('*'*33) print(tuple2) print(type(tuple2)...
  • Tuple数组

    2018-07-13 14:48:40
    Tuple1[1]:=2 *批里改变数组元素的值 Tuple1[1,3,5]:='abc' *批里给Tup1e数组赋值,其值为e到99连续数值 Tuple3:=[0:99] *批里给Tuple数组赋值,其值为3到200连续数值,步长为1 Tuple4:=[0:3:99] ...
  • scala中的tuple1,2,3

    千次阅读 2017-05-30 17:57:03
    通过下标_n取数据不多说了,...1、Tuple 和Function 和Producct一样最多只支持22个元素  比如 (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21) 这样是没问题的  但是(0,1,2,3,4,5,6,7,8,9,10,11,12,13
  • tuple不定值数组

    2021-01-20 13:39:11
    1、tuple 2、函数 3、测试 1、tuple tuple扩展了pair将两个元素看成一个单元的功能,实现了可以将任意数量元素当作一个单元的功能。 2、函数 tuple t; 以n个给定类型的元素建立tuple tuple t(v1,v2...vn);建立tuple...
  • Java元组Tuple使用实例

    万次阅读 2018-05-28 17:17:53
    为什么使用元组tuple? 元组和列表list一样,都可能用于数据存储,包含多个数据;但是和列表不同的是:列表只能存储相同的数据类型,而元组不一样,它可以存储不同的数据类型,比如同时存储int、string、list等,...
  • Flink中Tuple类型

    千次阅读 2020-06-22 19:54:48
    Types.TUPLE(TypeInformation.of(Long.class),TypeInformation.of(Integer.class));
  • scala中Tuple简单使用

    2018-07-08 23:04:00
    /** * Tuple简单使用记录 * 最大22个参数 */ object TupleUse { ... def main(args: Array[String]): Unit = { ... val t = ("yxj", 30) // Tuple2 println(t._1) // 访问Tuple中的数据,从1开始,带上下划...
  • Scala中Tuple(元组)的使用

    万次阅读 2016-11-30 22:17:32
    在我们使用某些编程语言开发时,有些时候总是希望函数(方法)返回多个值。一般情况下我们有几种选择,下面我们以...public void func1(String arg1,String arg2){ arg1 = "abc"; arg2 = "bcd"; }这种方式代码比较
  • 不BB直接上代码: object Test { def main(args: Array[String]): Unit = { val a = (&quot;a&quot;, 1, 9.9) var rs = &quot;&quot; a.productIterator.foreach(v=&amp;...}
  • 主要介绍了简单了解python元组tuple相关原理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 【Python】Tuple遍历

    千次阅读 2019-08-17 15:38:38
    小白之前博文已经写了Dtaframe、set等的遍历,这篇博文来总结下遍历TupleTuple一旦创建后,元素不可变,遍历和List其实是一样的,下面来看下吧~ 首先,创建一个tuple: ...对于方法1、2、3、4都...
  • Tuple2<String, Integer>> wordAndOne = .... 1. 单个字段keyBy 用字段位置 wordAndOne.keyBy(0) 用字段表达式 wordAndOne.keyBy(v -> v.f0) 2. 多个字段keyBy 用字段位置 wordAndOne.keyBy(0, 1) ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 248,571
精华内容 99,428
关键字:

tuple2