精华内容
下载资源
问答
  • java累加计数

    千次阅读 2019-01-28 17:49:37
    public class Demo2 { public static void main(String args[]){ int sum = 0;... //计数初值 for(int i=1;i<=10;i++){ //输出1-10 sum+=i; //累加 System.out...
    public class Demo2 {
    	public static void main(String args[]){
    		int sum = 0;							//累加初值
    		int count = 0;							//计数初值
    		for(int i=1;i<=10;i++){					//输出1-10
    			sum+=i;								//累加
    			 System.out.println("1-10之间的整数第"+i+"相加:"+sum);
    			if(sum>20){							//判断累加大于20
    				count++;						//计数累加大于20有几个
    				
    				
    			}
    			
    		}
    		
    		System.out.println("累加值大于20的数有:"+count+"个");
    	}
    
    }

    #生成效果

    展开全文
  • java多线程累加计数

    万次阅读 2018-03-21 15:49:29
    java多线程计数 题目 给定count=0;让5个线程并发累加到1000; 思路 创建一个类MyRunnable,实现Runnable(继承Thread类也可) 定义一个公共变量count(初始值为0),5个线程都可以访问到; 创建5个线程并发递增...

     

    java 多线程 计数


    题目

    给定count=0;让5个线程并发累加到1000;

    思路

    • 创建一个类MyRunnable,实现Runnable(继承Thread类也可)
    • 定义一个公共变量count(初始值为0),5个线程都可以访问到;
    • 创建5个线程并发递增count到1000;

    注意

    这块注意Thread和Runnable类的区别,Thread类是线程类,可以直接new Thread().start运行。而Runnable类是任务类,需要一个线程来承载任务,通过new Thread(new Runnable()).start()来运行任务。

    方法

    方法一

    将count公共变量放到测试类Test的类成员变量里,将MyRunnable类作为Test类的内部类,在Test类的main方法里创建5个线程,实现累加。

    代码

    public class Test {
    	//公共变量
    	int count=0;
    	public static void main(String[] args){
    		//new一个实现Runnable的类
    		Test test=new Test();
    		//创建5个任务
    		MyRunnable myRunnable1=test.new MyRunnable();
    		MyRunnable myRunnable2=test.new MyRunnable();
    		MyRunnable myRunnable3=test.new MyRunnable();
    		MyRunnable myRunnable4=test.new MyRunnable();
    		MyRunnable myRunnable5=test.new MyRunnable();
    		//创建5个线程
    		new Thread(myRunnable1).start();
    		new Thread(myRunnable2).start();
    		new Thread(myRunnable3).start();
    		new Thread(myRunnable4).start();
    		new Thread(myRunnable5).start();
    	}
    	//创建一个实现Runnable的类
    	class MyRunnable implements Runnable{
    		public void run() {
    			while(true){
    				//锁住的是整个MyRunnable类
    				synchronized(MyRunnable.class){
    					if(count>=1000){
    						break;
    					}
    					System.out.println(Thread.currentThread().getName()+":count:"+(++count));
    					//测试时,线程更容易切换
    					Thread.yield();
    				}
    				
    			}
    		}
    		
    	}
    
    }

    方法二

    以上代码没有问题,成功实现5个线程累加count到1000,接下来我们将上边代码稍作修改。

     

    • 将5个线程执行5个任务,修改为5个线程执行同一任务。
    • 将synchronized(MyRunnable.class)修改为synchronized(this)

    代码

    public class Test {
    	//公共变量
    	int count=0;
    	public static void main(String[] args){
    		//new一个实现Runnable的类
    		Test test=new Test();
    		//创建1个任务
    		MyRunnable myRunnable1=test.new MyRunnable();
    //		MyRunnable myRunnable2=test.new MyRunnable();
    //		MyRunnable myRunnable3=test.new MyRunnable();
    //		MyRunnable myRunnable4=test.new MyRunnable();
    //		MyRunnable myRunnable5=test.new MyRunnable();
    		//创建5个线程
    		for(int i=0;i<4;i++){
    			new Thread(myRunnable1).start();
    		}
    //		new Thread(myRunnable2).start();
    //		new Thread(myRunnable3).start();
    //		new Thread(myRunnable4).start();
    //		new Thread(myRunnable5).start();
    	}
    	//创建一个实现Runnable的类
    	class MyRunnable implements Runnable{
    		public void run() {
    			while(true){
    				//锁住的是同一对象
    				synchronized(this){
    					if(count>=1000){
    						break;
    					}
    					System.out.println(Thread.currentThread().getName()+":count:"+(++count));
    					//测试时,线程更容易切换
    					Thread.yield();
    				}
    				
    			}
    		}
    		
    	}
    
    }

     

    以上代码没有问题,成功实现5个线程累加count到1000。 

    虽然结果是一样的,但是代码实现是不一样的,代码一是创建了5个MyRunnable对象,代码二只创建了1个MyRunnable对象。考虑并发时用到的锁就是不一样的,

    代码一和代码二虽然synchronized中的锁不同,但目的都是为了括号中的锁是恒定不变的。

    • synchronized(this)代表锁是this对象,代码二中之所以可以使用this,是因为几个线程使用的this都是同一个对象。
    • synchronized(MyRunnable.class)代表锁是MyRunnable.class.this,因为MyRunnable.class.this是类加载到静态方法区中,是一直存在不变的,代码一中可以使用,当然代码二也可以这样写。
    • 代码一和代码二可以使用更通用的方式就是专门new一个锁对象,这个锁对象可以放在类成员变量里,加上static就可以一直常存。如定义成public static Object lock=new Object();代码一和代码二都可以使用synchronized(lock)来加锁。synchronized(this)这种方式主要是因为书写方便。

    方法三

    使用AtomicInteger类,来实现多线程累加,AtomicInteger类是线程安全的,使用它的优点就是我们不需要在代码里写Synchronized关键字了,这些事都交给它去做了。

    代码

    public class Test {
        static CountDownLatch cdl=new CountDownLatch(1000);;
        static AtomicInteger ai=new AtomicInteger(0);
        public static void main(String[] args) throws InterruptedException{ 
        	ExecutorService exec=Executors.newFixedThreadPool(100);
        	  for (int i = 0; i < 1000; i++) {
        		  exec.execute(new Runnable() {
    				@Override
    				public void run() {
    					System.out.println(Thread.currentThread().getName()+":"+ai.getAndIncrement());
    					cdl.countDown();
    				}
    			});
              }
        	cdl.await();
        	System.out.println(ai.get());
        	exec.shutdown();
        } 
    }

    代码中用到了CountDownLatch类,用法就是给其设定一个初始值1000,然后在不同线程中执行countDown方法,每执行一次,初始值-1,await方法就是等初始值减到0时,停止等待,否则一直等待。

    我在代码里新建了100个线程来并发累加,让我们看下最后结果。

    控制台输出如下:

    可以看到虽然输出不是按照顺序输出的,但是最后的结果是我们想要的结果,没有出现重复值的情况。

    总结

    这到题目只是举了一个多线程的例子,以及锁的简单知识。在实际应用中,从0累加到1000用多线程是没有意义的。因为根本不会比单线程快。就像让一个人数数,从0数到1000,或者让5个人接替数到1000,应该一个人更快点吧,5个人还要考虑配合的问题。但假如这5个人都是磕巴(口语不好),一个人每读一个数都要停顿1秒,但是让5个人协作,省去中间的等待时间,才是多线程应用的真正意义。
    多线程的真正应用应该是,任务中有等待的时间,这个等待时间如果交给一个线程做就堵塞在这块了。如果交由多个线程去做,会充分利用等待时间,去做其他事情。这才是多线程的意思。在我们日常工作中,像IO,网络,图片处理等,有些地方都是需要等待的,这几块用多线程,可能会提高效率。

    当然,也有一种情况,比如多个用户访问后台接口,每个用户访问其实都是一个单独的线程,假如想计算累计有多少次访问的话,就需要用到多线程累加。

    同类型文章

    感兴趣的也可以参考我的另外一篇文章,多线程计算数组之和

    参考资料

    深入理解synchronized(synchronized锁住的是代码还是对象)

    深入理解java并发之sychronized实现原理

    java中Sychronized用法

    展开全文
  • 网络上大部分文章都没有做到真正的单词累加计数,终于研究完以后成功实现 简单的Kafka生产者 package com.zwj.utils; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import ...

    网络上大部分文章都没有做到真正的单词累加计数,终于研究完以后成功实现

    简单的Kafka生产者

    package com.zwj.utils;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.storage.StorageLevel;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import scala.Tuple2;
    
    import java.util.*;
    
    public class Product {
        public static void KafkaProduct(String message){
            String topics = "TuiJian3";
            Properties properties = new Properties();
            properties.put("serializer.class", "kafka.serializer.StringEncoder");
            properties.put("metadata.broker.list", "192.168.42.132:9092,192.168.42.134:9092,192.168.42.135:9092");
            properties.put("request.required.acks", "1");
            ProducerConfig config = new ProducerConfig(properties);
            Producer<String, String> producer = new Producer<String, String>(config);
            List<KeyedMessage<String,String>> messageList = new ArrayList<>();
            KeyedMessage<String,String> message1 = new KeyedMessage<String, String>(topics,message);
            messageList.add(message1);
            producer.send(messageList);
        }
    
    
    
        public static void main(String[] args) {
                    KafkaProduct("hello world");
        }
    }
    

     

    SparkStreaming读取Kafka数据

    和其他不同的是不能使用ReduceByKey应该使用的是updateStateByKey

    package com.zwj.utils;
    
    import com.google.common.base.Optional;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.storage.StorageLevel;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.lang.Iterable;
    import java.util.List;
    
    
    public class SparkStreamingRead {
        public static void SparkStreamingReadProduct() {
            SparkConf conf = new SparkConf().setAppName("ReadKafka").setMaster("local[2]");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(5));
            String zkQurum = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
            String group = "zwj-consumer-group";
            String topics = "TuiJian3";
            HashMap<String, Integer> map = new HashMap<>();
            map.put(topics, 2);
            ssc.checkpoint("hdfs://hadoop02:9000/ck-2018-24-004");
            JavaPairReceiverInputDStream<String, String> data = KafkaUtils.createStream(ssc, zkQurum, group, map, StorageLevel.MEMORY_AND_DISK_SER());
            JavaDStream<String> lines = data.map(new Function<Tuple2<String, String>, String>() {
                public String call(Tuple2<String, String> tuple2) {
                    return tuple2._2();
                }
            });
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterable<String> call(String lines) {
                    Iterable<String> iterator = Arrays.asList(lines.split(" "));
                    return iterator;
    
                }
            });
            JavaPairDStream<String, Integer> wordCouts = words.mapToPair(new PairFunction<String, String, Integer>() {
    
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<>(s, 1);
                }
            }).updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
                @Override
                public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {
                    Integer i2 = v2.or(new Integer(0));
                    Integer sum = new Integer(0);
                    for (Integer temp : v1) {
                        sum += temp;
                    }
                    sum += i2;
                    return Optional.of(sum);
                }
            });
    
           /* .updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
                @Override
                public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {
    
    
                    return Optional.of(sum);
                }
            });*/
            wordCouts.print();
            ssc.start();
            ssc.awaitTermination();
            ssc.close();
    
        }
    
        public static void main(String[] args) {
            SparkStreamingReadProduct();
        }
    }
    

     

    展开全文
  • spark 累加历史主要用到了窗口函数,而进行全部统计,则需要用到rollup函数1 应用场景:1、我们需要统计用户的总使用时长(累加历史)2、前台展现页面需要对多个维度进行查询,如:产品、地区等等3、需要展现的表格头...

    spark 累加历史主要用到了窗口函数,而进行全部统计,则需要用到rollup函数

    1  应用场景:

    1、我们需要统计用户的总使用时长(累加历史)

    2、前台展现页面需要对多个维度进行查询,如:产品、地区等等

    3、需要展现的表格头如: 产品、2015-04、2015-05、2015-06

    2 原始数据:

    69c5a8ac3fa60e0848d784a6dd461da6.pngproduct_code |event_date |duration |

    -------------|-----------|---------|

    1438 |2016-05-13 |165 |

    1438 |2016-05-14 |595 |

    1438 |2016-05-15 |105 |

    1629 |2016-05-13 |12340 |

    1629 |2016-05-14 |13850 |

    1629 |2016-05-15 |227 |

    69c5a8ac3fa60e0848d784a6dd461da6.png

    3 业务场景实现

    3.1 业务场景1:累加历史:

    如数据源所示:我们已经有当天用户的使用时长,我们期望在进行统计的时候,14号能累加13号的,15号能累加14、13号的,以此类推

    3.1.1 spark-sql实现

    69c5a8ac3fa60e0848d784a6dd461da6.png//spark sql 使用窗口函数累加历史数据

    sqlContext.sql("""select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration

    from userlogs_date""").show+-----+----------+------------+

    |pcode|event_date|sum_duration|

    +-----+----------+------------+

    | 1438|2016-05-13| 165|

    | 1438|2016-05-14| 760|

    | 1438|2016-05-15| 865|

    | 1629|2016-05-13| 12340|

    | 1629|2016-05-14| 26190|

    | 1629|2016-05-15| 26417|

    +-----+----------+------------+

    69c5a8ac3fa60e0848d784a6dd461da6.png

    3.1.2 dataframe实现

    69c5a8ac3fa60e0848d784a6dd461da6.png//使用Column提供的over 函数,传入窗口操作importorg.apache.spark.sql.expressions._

    val first_2_now_window= Window.partitionBy("pcode").orderBy("event_date")

    df_userlogs_date.select(

    $"pcode",

    $"event_date",

    sum($"duration").over(first_2_now_window).as("sum_duration")

    ).show+-----+----------+------------+

    |pcode|event_date|sum_duration|

    +-----+----------+------------+

    | 1438|2016-05-13| 165|

    | 1438|2016-05-14| 760|

    | 1438|2016-05-15| 865|

    | 1629|2016-05-13| 12340|

    | 1629|2016-05-14| 26190|

    | 1629|2016-05-15| 26417|

    +-----+----------+------------+

    69c5a8ac3fa60e0848d784a6dd461da6.png

    3.1.3 扩展 累加一段时间范围内

    实际业务中的累加逻辑远比上面复杂,比如,累加之前N天,累加前N天到后N天等等。以下我们来实现:

    3.1.3.1 累加历史所有:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date

    select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date

    Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,0)Window.partitionBy("pcode").orderBy("event_date")

    上边四种写法完全相等

    3.1.3.2 累加N天之前,假设N=3select pcode,event_date,sum(duration) over (partition by pcode order by event_date ascrows between 3 preceding and current row) as sum_durationfrom userlogs_dateWindow.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,0)

    3.1.3.3 累加前N天,后M天: 假设N=3 M=5select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and 5 following ) as sum_duration from userlogs_date

    Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,5)

    3.1.3.4 累加该分区内所有行select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding andunboundedfollowing) as sum_duration from userlogs_dateWindow.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,Long.MaxValue)

    总结如下:preceding:用于累加前N行(分区之内)。若是从分区第一行头开始,则为 unbounded。 N为:相对当前行向前的偏移量following :与preceding相反,累加后N行(分区之内)。若是累加到该分区结束,则为 unbounded。N为:相对当前行向后的偏移量current row:顾名思义,当前行,偏移量为0说明:上边的前N,后M,以及current row均会累加该偏移量所在行3.1.3.4 实测结果累加历史:分区内当天及之前所有 写法1:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date

    69c5a8ac3fa60e0848d784a6dd461da6.png+-----+----------+------------+

    |pcode|event_date|sum_duration|

    +-----+----------+------------+

    | 1438|2016-05-13| 165|

    | 1438|2016-05-14| 760|

    | 1438|2016-05-15| 865|

    | 1629|2016-05-13| 12340|

    | 1629|2016-05-14| 26190|

    | 1629|2016-05-15| 26417|

    +-----+----------+------------+

    69c5a8ac3fa60e0848d784a6dd461da6.png累加历史:分区内当天及之前所有 写法2:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date

    69c5a8ac3fa60e0848d784a6dd461da6.png+-----+----------+------------+

    |pcode|event_date|sum_duration|

    +-----+----------+------------+

    | 1438|2016-05-13| 165|

    | 1438|2016-05-14| 760|

    | 1438|2016-05-15| 865|

    | 1629|2016-05-13| 12340|

    | 1629|2016-05-14| 26190|

    | 1629|2016-05-15| 26417|

    +-----+----------+------------+

    69c5a8ac3fa60e0848d784a6dd461da6.png累加当日和昨天:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and current row) as sum_duration from userlogs_date

    69c5a8ac3fa60e0848d784a6dd461da6.png+-----+----------+------------+

    |pcode|event_date|sum_duration|

    +-----+----------+------------+

    | 1438|2016-05-13| 165|

    | 1438|2016-05-14| 760|

    | 1438|2016-05-15| 700|

    | 1629|2016-05-13| 12340|

    | 1629|2016-05-14| 26190|

    | 1629|2016-05-15| 14077|

    +-----+----------+------------+

    69c5a8ac3fa60e0848d784a6dd461da6.png累加当日、昨日、明日:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and 1 following ) as sum_duration from userlogs_date

    69c5a8ac3fa60e0848d784a6dd461da6.png+-----+----------+------------+

    |pcode|event_date|sum_duration|

    +-----+----------+------------+

    | 1438|2016-05-13| 760|

    | 1438|2016-05-14| 865|

    | 1438|2016-05-15| 700|

    | 1629|2016-05-13| 26190|

    | 1629|2016-05-14| 26417|

    | 1629|2016-05-15| 14077|

    +-----+----------+------------+

    69c5a8ac3fa60e0848d784a6dd461da6.png累加分区内所有:当天和之前之后所有:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date

    69c5a8ac3fa60e0848d784a6dd461da6.png+-----+----------+------------+

    |pcode|event_date|sum_duration|

    +-----+----------+------------+

    | 1438|2016-05-13| 865|

    | 1438|2016-05-14| 865|

    | 1438|2016-05-15| 865|

    | 1629|2016-05-13| 26417|

    | 1629|2016-05-14| 26417|

    | 1629|2016-05-15| 26417|

    +-----+----------+------------+

    69c5a8ac3fa60e0848d784a6dd461da6.png3.2 业务场景2:统计全部

    3.2.1 spark sql实现

    69c5a8ac3fa60e0848d784a6dd461da6.png//spark sql 使用rollup添加all统计

    sqlContext.sql("""select pcode,event_date,sum(duration) as sum_duration

    from userlogs_date_1

    group by pcode,event_date with rollup

    order by pcode,event_date""").show()+-----+----------+------------+

    |pcode|event_date|sum_duration|

    +-----+----------+------------+

    | null| null| 27282|

    | 1438| null| 865|

    | 1438|2016-05-13| 165|

    | 1438|2016-05-14| 595|

    | 1438|2016-05-15| 105|

    | 1629| null| 26417|

    | 1629|2016-05-13| 12340|

    | 1629|2016-05-14| 13850|

    | 1629|2016-05-15| 227|

    +-----+----------+------------+

    69c5a8ac3fa60e0848d784a6dd461da6.png

    3.2.2 dataframe函数实现

    69c5a8ac3fa60e0848d784a6dd461da6.png//使用dataframe提供的rollup函数,进行多维度all统计

    df_userlogs_date.rollup($"pcode", $"event_date").agg(sum($"duration")).orderBy($"pcode", $"event_date")+-----+----------+-------------+

    |pcode|event_date|sum(duration)|

    +-----+----------+-------------+

    | null| null| 27282|

    | 1438| null| 865|

    | 1438|2016-05-13| 165|

    | 1438|2016-05-14| 595|

    | 1438|2016-05-15| 105|

    | 1629| null| 26417|

    | 1629|2016-05-13| 12340|

    | 1629|2016-05-14| 13850|

    | 1629|2016-05-15| 227|

    +-----+----------+-------------+

    69c5a8ac3fa60e0848d784a6dd461da6.png

    3.3 行转列 ->pivot

    pivot目前还没有sql语法,先用df语法吧

    69c5a8ac3fa60e0848d784a6dd461da6.pngval userlogs_date_all = sqlContext.sql("select dcode, pcode,event_date,sum(duration) as duration from userlogs group by dognum, pcode,event_date")

    userlogs_date_all.registerTempTable("userlogs_date_all")

    val dates= userlogs_date_all.select($"event_date").map(row => row.getAs[String]("event_date")).distinct().collect().toList

    userlogs_date_all.groupBy($"dcode", $"pcode").pivot("event_date", dates).sum("duration").na.fill(0).show+-----------------+-----+----------+----------+----------+----------+

    | dcode|pcode|2016-05-26|2016-05-13|2016-05-14|2016-05-15|

    +-----------------+-----+----------+----------+----------+----------+

    | F2429186| 1438| 0| 0| 227| 0|

    | AI2342441| 1438| 0| 0| 0| 345|

    | A320018711| 1438| 0| 939| 0| 0|

    | H2635817| 1438| 0| 522| 0| 0|

    | D0288196| 1438| 0| 101| 0| 0|

    | Y0242218| 1438| 0| 1036| 0| 0|

    | H2392574| 1438| 0| 0| 689| 0|

    | D2245588| 1438| 0| 0| 1| 0|

    | Y2514906| 1438| 0| 0| 118| 4|

    | H2540419| 1438| 0| 465| 242| 5|

    | R2231926| 1438| 0| 0| 305| 0|

    | H2684591| 1438| 0| 136| 0| 0|

    | A2548470| 1438| 0| 412| 0| 0|

    | GH000309| 1438| 0| 0| 0| 4|

    | H2293216| 1438| 0| 0| 0| 534|

    | R2170601| 1438| 0| 0| 0| 0|

    |B2365238;B2559538| 1438| 0| 0| 0| 0|

    | BQ005465| 1438| 0| 0| 642| 78|

    | AH2180324| 1438| 0| 608| 146| 36|

    | H0279306| 1438| 0| 490| 0| 0|

    +-----------------+-----+----------+----------+----------+----------+

    69c5a8ac3fa60e0848d784a6dd461da6.png

    附录

    下面是这两个函数的官方api说明:org.apache.spark.sql.scala

    69c5a8ac3fa60e0848d784a6dd461da6.png

    Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData forall the available aggregate functions.

    Thisisa variant of rollup that can only group by existing columns using column names (i.e. cannot construct expressions).// Compute the average for all numeric columns rolluped by department andgroup.

    df.rollup("department", "group").avg()// Compute the max age and average salary, rolluped by department andgender.

    df.rollup($"department", $"gender").agg(Map("salary" -> "avg","age" -> "max"))

    69c5a8ac3fa60e0848d784a6dd461da6.png

    69c5a8ac3fa60e0848d784a6dd461da6.pngdef rollup(cols: Column*): GroupedData

    Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData forall the available aggregate functions.

    df.rollup($"department", $"group").avg()// Compute the max age and average salary, rolluped by department andgender.

    df.rollup($"department", $"gender").agg(Map("salary" -> "avg","age" -> "max"))

    69c5a8ac3fa60e0848d784a6dd461da6.pngorg.apache.spark.sql.Column.scala

    69c5a8ac3fa60e0848d784a6dd461da6.pngdefover(window: WindowSpec): Column

    Define a windowing column.

    val w= Window.partitionBy("name").orderBy("id")

    df.select(

    sum("price").over(w.rangeBetween(Long.MinValue, 2)),

    avg("price").over(w.rowsBetween(0, 4))

    )

    69c5a8ac3fa60e0848d784a6dd461da6.png

    posted on 2017-10-23 22:05 xzc 阅读(616) 评论(0)  编辑  收藏 所属分类: hadoop

    展开全文
  • 作者: 一字马胡转载标志 【2017-11-03】更新日志日期更新内容备注2017-11-03添加转载标志持续更新Java Striped64Striped64是在java8中添加用来支持累加器的并发组件,它可以在并发环境下使用来做某种计数,Striped...
  • class count_demo { public static void main(String[] args) { int count=0; for (int x=1; x&lt;101;x++) { if (x%7==0) { count++; } } System.out.println(&...+coun...
  • Java语言高分悬赏:怎么实现36进制计数法的累加,要求能带进位算法的
  • ## **计数排序**计数排序不是基于比较的排序算法,其核心在于将输入的数据值转化为键存储在额外开辟的数组空间中。 作为一种线性时间复杂度...3. 对所有的计数累加(从C中的第一个元素开始,每一项和前一项相加);4....
  • 初始数据ListAnimalList=Lists.newArrayList(newAnimal("dog",6),newAnimal("dog",6),newAnimal("cat",7),newAnimal("cat",7),newAnimal("cat",7),newAnimal("pig",8));1)计数Mapmap=AnimalList.stream...
  • 计数器在很多网站中都进行了广泛的应用,比如文章的点赞数、页面的浏览数、网站的访客数、视频的播放数等等。...其中,incr() 方法用于累加计数,get_cnt() 方法用于获取当前的计数值。fromredisimportR...
  • package wzs.sort; import java.util....//对所有的计数累加(从C中的第一个元素开始,每一项和前一项相加) //反向填充目标数组:将每个元素i放在新数组的第C(i)项,每放一个元素就将C(i)减去1 public class Cou
  • 3.对所有的计数累加sum 4.反向填充目标数组:将计数数组的元素顺序倒出到新数组中,每倒出一个元素,计数数组的计数就减一(sum也减一),sum减到0结束 排序的各项指标 平均时间复杂度:O(N+K) 最坏时间复杂度:O(N+K)...
  • 计数二进制子串 (696) 主体思想:首先,求出字符串所有的子串;然后根据题目中的限制条件对所有子串逐一判断,对符合要求的子串进行累加。最后输出符合要求的子串次数count。 对子串的判断条件是:首先分别统计0和...
  • 2、新数组下标中对应的值重复的计数累加; 3、最后新数组中下标有值的依次放到数组中(下标本身已经排好序了)。 2、时间复杂度 平均时间复杂度O(n + k) 3、算法实现 public class CountSort { public static void ...
  • 计数排序(Countsort)之Java实现

    千次阅读 2013-12-06 21:39:09
    所谓排序算法,无非就是把正确的元素放到正确的位置,计数排序就是计算相同key的元素各有多少个,然后根据出现的次数累加而获得最终的位置信息。但是计数排序有两个限制条件,那就是存在一个正整数K,使得数组里面的...
  • 8、计数排序(Counting Sort)  计数排序不是基于比较的排序算法,其核心在于将输入的数据值转化为键存储在额外开辟的数组空间中。 作为一种线性时间复杂度的... 对所有的计数累加(从C中的第一个元素开始,每一...
  • 计数排序 原理 计数排序(Counting Sort) 使用了一个额外的数组 C,其中第 i 个元素是...对所有的计数累加(从 C 中的第一个元素开始,每一项和前一项相加); 反向填充目标数组:将每个元素 i 放在新数组的第 C(i)项,
  • 对于字符串的排序,不同与之前所学的值得排序,字符串的比较更为复杂,这里先学习一个基础算法叫做索引计数法,他是lfs和mfs的实现基础。 这个算法要求我们可以获取目标的键并按照这个键来排序。 此时我们有数据如下...
  • 初始数据ListAnimalList=Lists.newArrayList(newAnimal("dog",6),newAnimal("dog",6),newAnimal("cat",7),newAnimal("cat",7),newAnimal("cat",7),newAnimal("pig",8));1)计数Mapmap=AnimalList.stream...
  • 前言需要计算的场景绕不过去会使用BigDecimal类,可频繁的判空让代码可读性下降也使代码冗余度增高,不判空又容易报空指针异常,而且有些场景下...工具类支持功能加减乘除、累加、倍数运算。Integer、Long、Float、D...
  • 计数排序不是基于比较的排序算法,其核心是将输入的数据值转化为键存储在额外开辟的数组空间中... 对所有的计数累加(从数组C中的第一个元素开始,每一项和前一项相加); 反向填充目标数组:将每个元素i放在新数...
  • 计数排序、桶排序、基数排序计数排序思想代码时间和空间复杂度桶排序思想代码时间和空间复杂度基数排序思想代码时间和空间复杂度 计数排序 思想 作为非比较类排序算法...3.累加完之后反向填充到目标数组中:把每个元素
  • 1、计数排序 ...对所有的计数累加(从C中的第一个元素开始,每一项和前一项相加); 反向填充目标数组:将每个元素i放在新数组的第C(i)项,每放一个元素就将C(i)减去1。 分析:计数排序是一个稳定的排序算法
  • 转载来自:计数排序之Java实现 计数排序算法介绍 比较排序算法可以通过决策树模型证明,其下线是O(nlgn)。而本文介绍的是时间效率为O(n)的计数排序。所谓排序算法,无非就是把正确的元素放到正确的位置,...
  • Striped64是在java8中添加用来支持累加器的并发组件,它可以在并发环境下使用来做某种计数,Striped64的设计思路是在竞争激烈的时候尽量分散竞争,在实现上,Striped64维护了一个base Count和一个Cell数组,计数...

空空如也

空空如也

1 2 3 4
收藏数 76
精华内容 30
关键字:

java累加计数

java 订阅