精华内容
下载资源
问答
  • Java 并发框架全览,这个牛逼!

    千次阅读 2019-03-15 10:13:10
    https://dzone.com/articles/a-birds-eye-view-on-java-concurrency-frameworks-1 1. 为什么要写这篇文章 几年前 NoSQL 开始流行的时候,像其他团队一样,我们的团队也热衷于令人兴奋的新东西,并且计划替换一个...

    来自:唐尤华

    https://dzone.com/articles/a-birds-eye-view-on-java-concurrency-frameworks-1

    1. 为什么要写这篇文章

    几年前 NoSQL 开始流行的时候,像其他团队一样,我们的团队也热衷于令人兴奋的新东西,并且计划替换一个应用程序的数据库。 但是,当深入实现细节时,我们想起了一位智者曾经说过的话:“细节决定成败”。最终我们意识到 NoSQL 不是解决所有问题的银弹,而 NoSQL vs RDMS 的答案是:“视情况而定”。 类似地,去年RxJava 和 Spring Reactor 这样的并发库加入了让人充满激情的语句,如异步非阻塞方法等。为了避免再犯同样的错误,我们尝试评估诸如 ExecutorService、 RxJava、Disruptor 和 Akka 这些并发框架彼此之间的差异,以及如何确定各自框架的正确用法。

    本文中用到的术语在这里有更详细的描述。

    2. 分析并发框架的示例用例

    3. 快速更新线程配置

    在开始比较并发框架的之前,让我们快速复习一下如何配置最佳线程数以提高并行任务的性能。 这个理论适用于所有框架,并且在所有框架中使用相同的线程配置来度量性能。

    • 对于内存任务,线程的数量大约等于具有最佳性能的内核的数量,尽管它可以根据各自处理器中的超线程特性进行一些更改。

      • 例如,在8核机器中,如果对应用程序的每个请求都必须在内存中并行执行4个任务,那么这台机器上的负载应该保持为 @2 req/sec,在 ThreadPool 中保持8个线程。

    • 对于 I/O 任务,ExecutorService 中配置的线程数应该取决于外部服务的延迟。

      • 与内存中的任务不同,I/O 任务中涉及的线程将被阻塞,并处于等待状态,直到外部服务响应或超时。 因此,当涉及 I/O 任务线程被阻塞时,应该增加线程的数量,以处理来自并发请求的额外负载。

      • I/O 任务的线程数应该以保守的方式增加,因为处于活动状态的许多线程带来了上下文切换的成本,这将影响应用程序的性能。 为了避免这种情况,应该根据 I/O 任务中涉及的线程的等待时间按比例增加此机器的线程的确切数量以及负载。

    4. 性能测试结果

    性能测试配置 GCP -> 处理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架构:x86_64;CPU 内核:8个(注意: 这些结果仅对该配置有意义,并不表示一个框架比另一个框架更好)。

    5. 使用执行器服务并行化 IO 任务

    5.1 何时使用?

    如果一个应用程序部署在多个节点上,并且每个节点的 req/sec 小于可用的核心数量,那么 ExecutorService 可用于并行化任务,更快地执行代码。

    5.2 什么时候适用?

    如果一个应用程序部署在多个节点上,并且每个节点的 req/sec 远远高于可用的核心数量,那么使用 ExecutorService 进一步并行化只会使情况变得更糟。

    当外部服务延迟增加到 400ms 时,性能测试结果如下(请求速率 @50 req/sec,8核)。

    5.3 所有任务按顺序执行示例

    // I/O 任务:调用外部服务
    String posts = JsonService.getPosts();
    String comments = JsonService.getComments();
    String albums = JsonService.getAlbums();
    String photos = JsonService.getPhotos();
    
    // 合并来自外部服务的响应
    // (内存中的任务将作为此操作的一部分执行)
    int userId = new Random().nextInt(10) + 1;
    String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
    String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
    
    // 构建最终响应并将其发送回客户端
    String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
    return response;

    5.4 I/O 任务与 ExecutorService 并行执行代码示例

    // 添加 I/O 任务
    List<Callable<String>> ioCallableTasks = new ArrayList<>();
    ioCallableTasks.add(JsonService::getPosts);
    ioCallableTasks.add(JsonService::getComments);
    ioCallableTasks.add(JsonService::getAlbums);
    ioCallableTasks.add(JsonService::getPhotos);
    
    // 调用所有并行任务
    ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
    List<Future<String>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks);
    
    // 获取 I/O  操作(阻塞调用)结果
    String posts = futuresOfIOTasks.get(0).get();
    String comments = futuresOfIOTasks.get(1).get();
    String albums = futuresOfIOTasks.get(2).get();
    String photos = futuresOfIOTasks.get(3).get();
    
    // 合并响应(内存中的任务是此操作的一部分)
    String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
    String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
    
    // 构建最终响应并将其发送回客户端
    return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

    6. 使用执行器服务并行化 IO 任务(CompletableFuture)

    与上述情况类似:处理传入请求的 HTTP 线程被阻塞,而 CompletableFuture 用于处理并行任务

    6.1 何时使用?

    如果没有 AsyncResponse,性能与 ExecutorService 相同。 如果多个 API 调用必须异步并且链接起来,那么这种方法更好(类似 Node 中的 Promises)。

    ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
    
    // I/O 任务
    CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
    CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
        ioExecutorService);
    CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
        ioExecutorService);
    CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
        ioExecutorService);
    CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();
    
    // 从 I/O 任务(阻塞调用)获得响应
    String posts = postsFuture.get();
    String comments = commentsFuture.get();
    String albums = albumsFuture.get();
    String photos = photosFuture.get();
    
    // 合并响应(内存中的任务将是此操作的一部分)
    String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
    String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
    
    // 构建最终响应并将其发送回客户端
    return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

    7. 使用 ExecutorService 并行处理所有任务

    使用 ExecutorService 并行处理所有任务,并使用 @suspended AsyncResponse response 以非阻塞方式发送响应。

    • HTTP 线程处理传入请求的连接,并将处理传递给 Executor Pool,当所有任务完成后,另一个 HTTP 线程将把响应发送回客户端(异步非阻塞)。

    • 性能下降原因:

      • 在同步通信中,尽管 I/O 任务中涉及的线程被阻塞,但是只要进程有额外的线程来承担并发请求负载,它仍然处于运行状态。

      • 因此,以非阻塞方式保持线程所带来的好处非常少,而且在此模式中处理请求所涉及的成本似乎很高。

      • 通常,对这里讨论采用的例子使用异步非阻塞方法会降低应用程序的性能。

    7.1 何时使用?

    如果用例类似于服务器端聊天应用程序,在客户端响应之前,线程不需要保持连接,那么异步、非阻塞方法比同步通信更受欢迎。在这些用例中,系统资源可以通过异步、非阻塞方法得到更好的利用,而不仅仅是等待。

    // 为异步执行提交并行任务
    ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
    CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
    CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
    ioExecutorService);
    CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
    ioExecutorService);
    CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
    ioExecutorService);
    
    // 当 /posts API 返回响应时,它将与来自 /comments API 的响应结合在一起 
    // 作为这个操作的一部分,将执行内存中的一些任务
    CompletableFuture<String> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture,
    (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments),
    ioExecutorService);
    
    // 当 /albums API 返回响应时,它将与来自 /photos API 的响应结合在一起 
    // 作为这个操作的一部分,将执行内存中的一些任务
    CompletableFuture<String> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture,
    (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos),
    ioExecutorService);
    
    // 构建最终响应并恢复 http 连接,把响应发送回客户端
    postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> {
    LOG.info("Building Async Response in Thread " + Thread.currentThread().getName());
    String response = s1 + s2;
    asyncHttpResponse.resume(response);
    }, ioExecutorService);

    8. RxJava

    • 这与上面的情况类似,唯一的区别是 RxJava 提供了更好的 DSL 可以进行流式编程,下面的例子中没有体现这一点。

    • 性能优于 CompletableFuture 处理并行任务。

    8.1 何时使用?

    如果编码的场景适合异步非阻塞方式,那么可以首选 RxJava 或任何响应式开发库。 还具有诸如 back-pressure 之类的附加功能,可以在生产者和消费者之间平衡负载。

    int userId = new Random().nextInt(10) + 1;
    ExecutorService executor = CustomThreads.getExecutorService(8);
    
    // I/O 任务
    Observable<String> postsObservable = Observable.just(userId).map(o -> JsonService.getPosts())
    .subscribeOn(Schedulers.from(executor));
    Observable<String> commentsObservable = Observable.just(userId).map(o -> JsonService.getComments())
    .subscribeOn(Schedulers.from(executor));
    Observable<String> albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums())
    .subscribeOn(Schedulers.from(executor));
    Observable<String> photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos())
    .subscribeOn(Schedulers.from(executor));
    
    // 合并来自 /posts 和 /comments API 的响应
    // 作为这个操作的一部分,将执行内存中的一些任务 
    Observable<String> postsAndCommentsObservable = Observable
    .zip(postsObservable, commentsObservable,
    (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments))
    .subscribeOn(Schedulers.from(executor));
    
    // 合并来自 /albums 和 /photos API 的响应
    // 作为这个操作的一部分,将执行内存中的一些任务 
    Observable<String> albumsAndPhotosObservable = Observable
    .zip(albumsObservable, photosObservable,
    (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos))
    .subscribeOn(Schedulers.from(executor));
    
    // 构建最终响应
    Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2)
    .subscribeOn(Schedulers.from(executor))
    .subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));

    9. Disruptor

    [Queue vs RingBuffer]

     

    • 在本例中,HTTP 线程将被阻塞,直到 disruptor 完成任务,并且使用 countdowlatch 将 HTTP 线程与 ExecutorService 中的线程同步。

    • 这个框架的主要特点是在没有任何锁的情况下处理线程间通信。在 ExecutorService 中,生产者和消费者之间的数据将通过 Queue传递,在生产者和消费者之间的数据传输过程中涉及到一个锁。 Disruptor 框架通过一个名为 Ring Buffer 的数据结构(它是循环数组队列的扩展版本)来处理这种生产者-消费者通信,并且不需要任何锁。

    • 这个库不适用于我们在这里讨论的这种用例。仅出于好奇而添加。

    9.1 何时使用?

    Disruptor 框架在下列场合性能更好:与事件驱动的体系结构一起使用,或主要关注内存任务的单个生产者和多个消费者。

    static {
        int userId = new Random().nextInt(10) + 1;
    
        // 示例 Event-Handler; count down latch 用于使线程与 http 线程同步
        EventHandler<Event> postsApiHandler = (event, sequence, endOfBatch) -> {
            event.posts = JsonService.getPosts();
            event.countDownLatch.countDown();
        };
    
        // 配置 Disputor 用于处理事件
        DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler)
        .handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2)
        .thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2)
        .handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2);
        DISRUPTOR.start();
    }
    
    // 对于每个请求,在 RingBuffer 中发布一个事件:
    Event event = null;
    RingBuffer<Event> ringBuffer = DISRUPTOR.getRingBuffer();
    long sequence = ringBuffer.next();
    CountDownLatch countDownLatch = new CountDownLatch(6);
    try {
        event = ringBuffer.get(sequence);
        event.countDownLatch = countDownLatch;
        event.startTime = System.currentTimeMillis();
    } finally {
        ringBuffer.publish(sequence);
    }
    try {
        event.countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    10. Akka

    • Akka 库的主要优势在于它拥有构建分布式系统的本地支持。

    • 它运行在一个叫做 Actor System 的系统上。这个系统抽象了线程的概念,Actor System 中的 Actor 通过异步消息进行通信,这类似于生产者和消费者之间的通信。

    • 这种额外的抽象级别有助于 Actor System 提供诸如容错、位置透明等特性。

    • 使用正确的 Actor-to-Thread 策略,可以对该框架进行优化,使其性能优于上表所示的结果。 虽然它不能在单个节点上与传统方法的性能匹敌,但是由于其构建分布式和弹性系统的能力,仍然是首选。

    10.1 示例代码

    // 来自 controller :
    Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());
    
    // handler :
    public Receive createReceive() {
        return receiveBuilder().match(Request.class, request -> {
        Event event = request.event; // Ideally, immutable data structures should be used here.
        request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf());
        request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf());
        request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf());
        request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf());
        }).match(Event.class, e -> {
        if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) {
        int userId = new Random().nextInt(10) + 1;
        String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts,
        e.comments);
        String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums,
        e.photos);
        String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
        e.response = response;
        e.countDownLatch.countDown();
        }
        }).build();
    }

    11. 总结

    • 根据机器的负载决定 Executor 框架的配置,并检查是否可以根据应用程序中并行任务的数量进行负载平衡。

    • 对于大多数传统应用程序来说,使用响应式开发库或任何异步库都会降低性能。只有当用例类似于服务器端聊天应用程序时,这个模式才有用,其中线程在客户机响应之前不需要保留连接。

    • Disruptor 框架在与事件驱动的架构模式一起使用时性能很好; 但是当 Disruptor 模式与传统架构混合使用时,就我们在这里讨论的用例而言,它并不符合标准。 这里需要注意的是,Akka 和 Disruptor 库值得单独写一篇文章,介绍如何使用它们来实现事件驱动的架构模式。

    • 这篇文章的源代码可以在 GitHub 上找到。

    展开全文
  • 并发处理框架

    2013-03-19 08:06:37
    并发处理框架,并发处理框架,并发处理框架
  • Java并发框架整理

    2016-06-08 18:15:24
    多线程从1.2到1.7各种接口使用及场景介绍。
  • JAVA 并发框架 API 一览 一执行框程序 (Executor) 最常见的用法就是用 Executors 来构造相关的线程池用 CompletionService 来分离生产任务和已经完成的任务 生产者 submit 执行的任务使用者 take 已完成的任务并按照...
  • 本知识框架是对《Java并发编程实战》一书的总结,主要围绕基本线程和线程池的使用、线程使用中出现的三大问题以及如何解决这些问题出发总结出了这个知识框架,通过该框架能更加深入的去理解Java并发理论知识。
  • Java 并发Lock 框架略解 一synchronized 的局限性 与 Lock 的优点 如果一个代码块被synchronized关键字修饰当一个线程获取了对应的锁并执行该代码块时其他线程便只能一直等待直至占有锁的线程释放锁事实上占有锁的...
  • 主要介绍了Java并发框架:Executor API详解,随着当今处理器中可用的核心数量的增加, 随着对实现更高吞吐量的需求的不断增长,多线程 API 变得非常流行。 Java 提供了自己的多线程框架,称为 Executor 框架,需要的...
  • 并发框架Disruptor

    2017-11-21 10:19:14
    Java并发框架Disruptor,里面采取环形缓存结构,速度更快,适用于生产者消费者模式
  • java并发框架(Akka)

    千次阅读 2020-06-16 15:56:54
    本文来源,是我在阅读《实战java并发程序设计》第7章锁了解的,参考了这本书,在此记录一下。往后也好翻翻。 Akka是用Scala创建的,Scala也是java虚拟机上的语言,和java差不多。 学习一个框架或者一门技术,...

    本文来源,是我在阅读《实战java高并发程序设计》第7章所了解的,参考了这本书,在此记录一下。往后也好翻翻。

    大佬地址:很全:https://blog.csdn.net/qq_35246620/article/details/79050895?utm_medium=distribute.pc_relevant_t0.none-task-blog-OPENSEARCH-1.nonecase&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-OPENSEARCH-1.nonecase

    Akka是用Scala创建的,Scala也是java虚拟机上的语言,和java差不多。

    学习一个框架或者一门技术,无外乎这三点,1是什么?2怎么用?3用了之后有什么好处?

    一,Akka能给我吗带来什么好处呢?

    1,首先,Akka提供了一种称之为Actor的并发模型,粒度比线程还要小,这表明你可以在系统当中创建及其大量的Actor

    2,Akka提供了一套容错机制,允许Actor出现异常时进行一些恢复或者重置的操作。

    3,Akka不仅可以在单机上构建高并发程序,还可以在网络上构建分布式程序,并提供位置透明的Actor定位服务。

     

    二,怎么用?

    先来创建一个hello world 程序试试手吧,

    创建一个maven工程,

    1,添加如下依赖

    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-actor_2.12</artifactId>
      <version>2.5.2</version>
    </dependency>
    

     

    2,创建 欢迎者

    package org.buptdavid.datastructure.zj.akka.demo1;
    
    import akka.actor.UntypedAbstractActor;
    
    /**
     * 欢迎者
     */
    public class Greeter extends UntypedAbstractActor{
    
        public enum Msg{
            GREET,DONE
        }
        @Override
        public void onReceive(Object message) throws Throwable {
            if(message==Msg.GREET){
                System.out.println("hello actor...");
                getSender().tell(Msg.DONE,getSelf());
            }else {
                unhandled(message);
            }
    
        }
    }
    

    UntypedAbstractActor(无类型的,不能再继承其他类(java单继承))就是我们所说的Actor

    再来创建一个与Greeter 交流的另外一个Actor  ,HelloActor如下:

    package org.buptdavid.datastructure.zj.akka.demo1;
    
    import akka.actor.ActorRef;
    import akka.actor.Props;
    import akka.actor.UntypedAbstractActor;
    
    public class HelloActor extends UntypedAbstractActor{
        ActorRef greeterRef;
        @Override
        public void preStart() throws Exception { //是Akka的回调方法,在Actor启动前会被Akka调用,完成一些初始化操作
            greeterRef = getContext().actorOf(Props.create(Greeter.class), "greeter");
            System.out.println("Greeter Actor Path :"+greeterRef.path());
            greeterRef.tell(Greeter.Msg.GREET,getSelf());
        }
        @Override
        public void onReceive(Object msg) throws Throwable {//消息处理函数
            if(msg==Greeter.Msg.DONE){
                greeterRef.tell(Greeter.Msg.GREET,getSelf());
                getContext().stop(getSelf());//让自己停止
            }else{
                unhandled(msg);
            }
        }
    }

     

     

    现在来创建一个测试HelloMainSimple:

    package org.buptdavid.datastructure.zj.akka.demo1;
    
    import akka.actor.ActorRef;
    import akka.actor.ActorSystem;
    import akka.actor.Props;
    import com.typesafe.config.ConfigFactory;
    
    public class HelloMainSimple {
        public static void main(String[] args) {
            ActorSystem actorSystem = ActorSystem.create("Hello", ConfigFactory.load("samplehello.conf"));
            ActorRef helloactor = actorSystem.actorOf(Props.create(HelloActor.class), "helloactor");//创建一个接收该对象管理的actor对象
            System.out.println("helloactor Actor path  "+helloactor.path());
        }
    
    }
    

    上面代码解释

    ActorSystem.create 第一个参数为系统名称,此定义的名称为hello ,第二个参数为配置文件

     

    在此我们需要提供一个配置文件 samplehello.conf

    内容如下:

    akka{
        loglevel=INFO
    }

    此配置文件值简单的配置了一下日志级别

     

    运行结果:

     

     

     

    展开全文
  • java并发测试框架源码

    2013-07-04 13:59:10
    java并发测试 代码很简单,就是一个test类 原理是这样的 1、定义并发数n 2、创建1、2、3……n个线程,创建后悬停 3、所有线程创建好后,同时执行 好处: 比for循环创建线程后马上执行更能测试并发。 代码说明: ...
  • Java并发容器和框架

    千次阅读 2017-05-04 17:25:14
    锁分段技术:Hashtable在线程竞争激烈的并发环境下效率低下的原因是所有访问Hashtable 的线程都竞争同一把锁,假如容器有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程并发访问容器里的不同数据段的数据...

    1.       ConcurrentHashMap

     

    ConcurrentHashMap是线程安全并且高效的HashMap。

     

    1.1   为什么使用ConcurrentHashMap?

    (1)      HashMap 是线程不安全的,在多线程环境下使用HashMap的put操作会使Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next指针将永远不为null,就会产生死循环获取Entry。

    (2)      Hashtable虽然是线程安全的,但是效率低下。Hashtable使用synchronized关键字保证线程安全,因此当一个线程访问Hashtable的同步方法时,娶她线程若也要访问Hashtable的同步方法,就会进入阻塞状态。(所有的线程竞争的是同一把锁)

    (3)      ConcurrentHashMap使用锁分段技术可以保证线程安全并提高并发访问效率。

    锁分段技术:Hashtable在线程竞争激烈的并发环境下效率低下的原因是所有访问Hashtable 的线程都竞争同一把锁,假如容器有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程并发访问容器里的不同数据段的数据时,线程之间就不会存在竞争,从而可以有效提高并发访问效率。ConcurrentHashMap 就是使用的锁分段技术,首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据时,其他数据段地数据也可以被其他线程访问。

     

    1.2   ConcurrentHashMap 的结构与操作

     

    ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种重入锁(ReentrantLock),HashEntry用来存储键值对数据。一个Segment里面包含一个HashEntry数组,每个Segment守护着一个HashEntry里的元素,当对HashEntry里的数据元素进行修改时,必修先获取对应的Segment锁。

     

    1.2.1        get操作

    先通过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。get操作简单且高效,器高效之处在于整个get过程是不需要加锁的,除非读到的值为空时才加锁重读。不加锁是因为它的get方法里将要使用的共享变量都被volatile修饰了,能够保证线程间的可见性,能够被多线程读,并不会读到过期数据。之所以不会读到过期数据是因为根据JAVA内存模型的happen before原则,对于volatile字段的写入操作先于读取操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的数据,这是用volatile替代锁的经典应用场景。

    1.2.2        put 操作

    put操作需要对共享变量进行写入操作,为了线程安全,必须加锁。Put方法首先定位到Segment,然后在Segment里进行插入操作。

    1.2.3        size操作

    要统计整个ConcurrentHashMap就要对所有的Segment 的count进行相加,虽然volatile保证相加获取的都是最新的count,但是在相加过程中,每个count都可能变化,那样统计结果就不准确了。最安全的统计方法就是在计算size时,将所有的Segment锁住,但是这样的做法是很低效的。

    因为在累加count过程中,之前累加过的count发生变化的几率很小,所以ConcurrentHashMap的做法是先尝试2次通过锁住不同Segment的方式来统计各个Segment

    的大小,如果统计过程中,count发生了变化,则再采取加锁的放肆统计所有的Segment大小。

     

    2.       ConcurrentLinkedQueue

    2.1   线程安全的队列

    线程安全的队列实现方式有两种:一种是使用阻塞算法,一种是使用非阻塞算法。使用阻塞算法的队列可以使用一把锁或两把锁实现,使用非阻塞算法的队列可以通过循环CAS的方式实现。ConcurrentLinkedQueue是使用非阻塞算法实现的。

     

    ConcurrentLinkedQueue是一个基于链接节点的无界线程安全的队列 ,它采用CAS算法实现。

     

    2.1.1        入队列

    入队列就是将入队节点添加到队列的尾部。入队主要做两件事情:一是将入队节点设置成当前尾节点的下一个节点,二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置称tail节点,如果tail节点的next为空,则将入队节点设置成tail节点的next节点,所以tail节点并不总是尾节点。

    为什么tail节点不总是尾节点?原因:让tail永远作为队列的尾节点,每次入队都需要使用循环CAS更新tail节点。减少tail节点的更新次数,就可以提高入队效率。Doug lea 使用hops 变量控制并减少tail节点的更新频率,并不是每次节点入队后都需要更新tail节点,而是当tail节点和尾节点的距离大于等于常量hops时,才更新tail节点,tail节点和尾节点的距离越长,使用CAS更新tail的次数就越少,但是器负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位尾节点,但是这样仍可以提高入队效率,因为本质上来看它通过增加对volatile变量的读操作来减少对其的写操作,而volatile变量的写操作的开销要远大于读操作。

    2.1.2        出队列

    出队列就是从队列中返回一个节点元素,并清空该节点对元素的引用。和tail的跟新一样,head节点也并不是每次出队都更新。也是通过hops变量来控制并减少CAS更新head节点的操作。

    3.       阻塞队列介绍

     

    阻塞队列(BlockingQueue):是一个支持两个附加操作的队列。这两个附加操作支持阻塞插入和移除方法。

    (1)      支持阻塞的插入方法:当队列满时,会阻塞插入元素的线程,直到队列不满。

    (2)      支持阻塞的移除方法:当队列为空时,会阻塞移除元素的线程,直到队列不为空。

     

    阻塞队列常用于生产者和消费者模式,生产者就是向队列添加元素的线程,消费者就是从队列移除元素的线程。阻塞队列就是生产者用来存放元素,消费者用来获取元素的容器。

     

    当队列不可用时,会有4种处理方式:

     

     

     

    抛出异常

    返回特殊值

    一直阻塞

    超时退出

    插入方法

    add(e)

    offer(e)

    put(e)

    offer(e,time,unit)

    移除方法

    Remove()

    Poll()

    Take()

    Poll(time,unit)

    检查方法

    Element()

    Peek()

     

     

     

    (3)  java里的7个阻塞队列

                    ArrayBlockingueue:一个由数组组成的有界阻塞队列

                    LinkedBlockingQueue: 一个有链表组成的有界阻塞队列

                    PriorityBolckingQueue: 一个支持优先级排序的无界阻塞队列

                    DelayQueue: 一个使用优先级队列实现的无界阻塞队列

                    SynchronousQueue: 一个不存储元素的阻塞队列

                    LinkedTransferQueue: 一个由链表组成的无界阻塞队列

                    LinkedBlockingDeque:一个由链表组成的双向阻塞队列

    4.      Fork/Join框架

     

    4.1  Fork/Join框架

    Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分解成若干个小任务,最终汇总每个小任务的结果后得到大任务结果的框架。(使用的分治法思想)

    工作窃取算法:是指某个线程从其他队列里窃取任务来执行。为什么要使用工作窃取算法?原因:假如我们需要做一个比较大的任务,可以把这个任务分隔成若干个互不依赖的子任务,为了减少线程间的竞争,就把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是,有的线程会先把自己的任务做完,而其他线程对应的队列里还有任务等待处理,于是他就去其他线程里窃取一个任务执行。而此时它们访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常使用双端队列,被窃取任务的线程永远从队列头部拿任务执行,而窃取任务的线程永远从队列的尾部取任务。

    4.2  Fork/Join 设计

    步骤1:分割任务  首先我们需要一个fork类来把大任务分割成子任务,直到子任务足够小。

    步骤2:执行任务并合并结果   分割的子任务分别放在双端队列里,然后启动结果线程分别从双端队列里拿任务。子任务执行完的结果都放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

    Fork/Join 使用两个类来实现上述步骤:

    ①  ForkJoinTask: 要使用Fork/Join框架,首先要创建一个ForkJoin任务。它提供在任务种执行fork()和join()的机制。通常不是直接继承ForkJoinTask类,而是继承其子类。

    RecursiveAction: 用于没有返回结果的任务

    RecursiveTask : 用于有返回结果的任务

    ②   ForkJoinPool ForkJoinTask需要通过ForkJoinPool来执行。

     

     

     

     

     

     

     

     

     

     

     

     

    展开全文
  • 网盘链接
  • JAVA并发编程核心方法与框架-高洪岩著 java 多并发经典书籍
  • 主要介绍了Java并发之线程池Executor框架的深入理解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • Java高性能异步并发框架Disruptor

    千次阅读 2020-03-07 23:19:11
    Java的内置队列 队列 有界性 锁 数据结构 ArrayBlockingQueue bounded 加锁 arraylist LinkedBlockingQueue optionally-bounded 加锁 linkedlist ConcurrentLinkedQueue ...

    Java的内置队列

    队列有界性数据结构
    ArrayBlockingQueuebounded加锁arraylist
    LinkedBlockingQueueoptionally-bounded加锁linkedlist
    ConcurrentLinkedQueueunbounded无锁(CAS)linkedlist
    LinkedTransferQueueunbounded无锁(CAS)linkedlist
    PriorityBlockingQueueunbounded加锁heap
    DelayQueueunbounded加锁heap

    队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列

    Disruptor核心

    • RingBuffer Disruptor
    • Sequence SequenceBarrier
    • WaitStrategy等待策略
    • EventHandler消费者处理器
    • WorkProcessor核心工作器

    Disruptor

    Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章种介绍了LMAX是一种 新型零售金融交易平台,它能够以很低的延迟产生大量交易,这个系统是建立在jvm平台上, 其核心是一个业务逻辑处理器:

    • 它能够在一个线程里面每秒处理600万订单
    • 业务逻辑处理器完全是运行在内存中,使用事件驱动方式
    • 业务逻辑处理器的核心是Disruptor

    Disruptor为什么高性能

    • 数据结构:使用环形队列、数组、内存预加载
    • 消除伪共享(填充缓存行)
    • 使用单线程写方式,内存屏障(volatile变量)
    • 序号栅栏和序号配合使用来消除锁和CAS

    系统缓存优化-消除伪共享

    • 缓存系统中是以缓存行(cache line)为单位存储的
    • 缓存行是2的整数幂个连续字节,一般为32-256个字节
    • 最常见的缓存行大小是64个字节
    • Sequence

    Disruptor 核心原理

    • 一个环状队列,用在不同线程之间传递数据
    • RingBuffer拥有一个序号,这个序号指向数组中下一个可用元素
    • RingBuffer:基于数组的缓存实现,也是创建sequencer与定义WaitStrategy的入口
    • Disruptor:持有RingBuffer、消费者线程池Executor、消费者集合ConsumerRepository等引用

    Disruptor 核心-Sequence

    • 通过顺序递增的序号来编号,管理进行交换的数据
    • 对数据的处理过程总是沿着序号逐渐处理
    • 一个Sequence用于跟踪标识某个特定的事件处理者(RingBuffer/Producer/Consumer)的处理进度, 多个Producer共用一个Sequence,但每一个Consumer分别对应一个Sequence, 当Producer获取的Sequence大于多个Consumer中的最小Sequence时,则等待,不再继续投递数据.
    • Sequence可以看成是一个AtomicLong用于标识进度
    • Sequence可以消除CPU缓存伪共享的问题

    Disruptor 核心-Sequencer

    • 实现类SingleProducerSequencer和MultiProducerSequencer
    • 生产者和消费者之间快速,正确的传递数据的并发算法

    Disruptor 核心-Sequencer Barrier

    • 用于保持对RingBuffer的Main Published Sequence(Producer)和Consumer之间的平衡关系;
    • Sequence Barrier用于判断决定Consumer是否还有可处理的事件.

    Disruptor 核心-Event

    • 从生产者到消费者过程中处理的数据
    • Disruptor中没有代码表示Event,而是用户自定义的,其实就是对应的处理数据实体类

    Disruptor 核心-EventProcessor

    • 继承Runnable接口,处理Disruptor中的Event,拥有消费者的Sequence
    • 其实现类BatchEventProcessor,包含了Event loop有效的实现,并且将回调到一个EventHandler接口实现

    Disruptor 核心-EventHandler

    • 代表一个消费者,用于处理队列中的数据,单消费者模式中使用
    • 线程池数量必须等于EventHandler数量.

    Disruptor 核心-WorkHandler

    Disruptor 核心-WorkProcessor

    确保每个Sequence只被一个processor消费,在同一个workPool中处理多个WorkProcessor不会消费同样的Sequence

    Disruptor 核心-WaitStrategy

    • 决定向队列添加数据时的等待策略

    BlockingWaitStrategy

    最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现, 内部使用了ReentrantLock

    SleepingWaitStrategy

    性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小, 适用于异步日志类的场景

    YieldingWaitStrategy

    性能是最好的,适用于低延时的系统. 在要求极高性能且事件处理线程数小于CPU逻辑核心数的场景中, 推荐使用此策略;例如,CPU开启超线程的特性.
    该策略慎用: 内部采用Thread.yield();会让消费线程让出cpu,但又会在让出后同时竞争资源,导致CPU飙升.

    disruptor.shutdown();

    • 会等待所有的消费者消费完成后关闭,
    • 但是不能在数据生产结束前调用shutdown,否则会导致过早的关闭disruptor,但是此时会继续投递数据, 而获取next Sequence时会导致死循环,因为(获取的生产者Sequence大于消费者Sequence,认为有数据没有消费完, 不能向该Sequence位置添加新数据,但是消费者此时已经被关闭,消费者Sequence不会再改变, 所以出现无限循环等待,无法继续向队列添加数据)

    WorkerPool实现多消费者模式

    单消费者模式中,虽然一个Handler对应一个线程,但是这些Handler处理的是同一个数据,所以是单消费者模式.
    多消费者模式: 多个线程处理的是不同的数据,队列中的数据只会被一条线程处理.

    Disruptor中消费线程 串行操作(单消费者模式)

      // 一个Handler对应一条线程,串行操作
     
      disruptor.handleEventsWith(new Handler1())
               .handleEventsWith(new Handler2())
               .handleEventsWith(new Handler3());

    Disruptor中消费线程 并行操作(单消费者模式)

      // 一个Handler对应一条线程,并行操作
      
      disruptor.handleEventsWith(new Handler1());
      disruptor.handleEventsWith(new Handler2());
      disruptor.handleEventsWith(new Handler3());
      // 一个Handler对应一条线程,并行操作
      
      disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());

    Disruptor中消费线程 串并行混合操作(单消费者模式)

      //Handler1 Handler2 并行执行完成后,串执行Handler3
      disruptor.handleEventsWith(new Handler1(), new Handler2())
               .handleEventsWith(new Handler3());
      //Handler1 Handler2 并行执行完成后,串执行Handler3
      EventHandlerGroup<Trade> group = disruptor.handleEventsWith(new Handler1(), new Handler2());
      group.then(new Handler3());

    6边形操作(单消费者模式)

                     --> h1 --> h2 -->
                  --                   --
        start -->                        --> h3
                  --                   --
                     --> h4 --> h5 -->
            
        h1和h4并行,h1和h2串行,h4和h5串行,h2和h5全部完成后执行 h3
    
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
    
        disruptor.handleEventsWith(h1,h4);
        disruptor.after(h1).handleEventsWith(h2);
        disruptor.after(h4).handleEventsWith(h5);
        disruptor.after(h2,h5).handleEventsWith(h3);

    线程池设置参考规则

    • 计算机密集型,耗cpu, 一般是cpu核心数+1或cpu核心数*2
    • IO密集型,一般是 cpu核心数 / (1-0.9) 或者 cpu核心数 / (1-0.8) 例如:8核处理器, 8 / (1-0.9) = 80 条io线程

    AOS架构

    AQS是AbstractQueuedSynchronizer的简称。AQS提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架.

    • AQS维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)
    • AQS定义两种资源共享方式: Exclusive, Share
    isHeldExclusively 方法: 该线程是否正在独占资源
    tryAcquire/tryRelease方法: 独占的方式尝试获取和释放资源
    tryAcquireShared/tryReleaseShared方法: 共享方式

    参考: https://www.jianshu.com/p/da9d051dcc3d

    ReentrantLock 重入锁

    1. state初始值为0,表示未锁定状态
    2. A线程lock时,会调用tryAcquire()独占该锁并将 state + 1
    3. 其它线程再tryAcquire()时就会失败,直到A线程unlock()到state=0为止,其他线程才会有机会获取该锁
    4. A线程释放之前,A线程自己可以重复获取,此锁的(state会累加),这就是可重入的概念
    5. 但是要注意,获取多少次就要释放多少次,这样才能保证state是能回到0.

    独享锁:该锁每一次只能被一个线程所持有。

    共享锁:该锁可被多个线程共有,典型的就是ReentrantReadWriteLock里的读锁,它的读锁是可以被共享的,但是它的写锁确每次只能被独占。

    com.lmax.disruptor.SingleProducerSequencer.next(int) 源码分析

    @Override
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }
    
        long nextValue = this.nextValue;
    
        long nextSequence = nextValue + n;
        
        //wrapPoint用于判断当前的序号有没有绕过整个RingBuffer容器
        //相当于标记生产者在队列中的逻辑队尾位置,
        long wrapPoint = nextSequence - bufferSize;
        
        //记录最小消费者序号
        long cachedGatingSequence = this.cachedValue;
    
        //如果生产者序号大于最小消费者序号,则可能需要等待
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence
    
            //最小的序号
            long minSequence;
            
            //如果生产者序号大于消费者中最小的序号,则自旋,等待空间
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }
    
            this.cachedValue = minSequence;
        }
    
        this.nextValue = nextSequence;
    
        return nextSequence;
    }
    展开全文
  • java并发编程的核心方法和框架.pdf》,好好学习,天天向上。。。
  • JavaExecutor并发框架.pdf

    2021-10-05 10:50:31
    JavaExecutor并发框架.pdf
  • 包含处理测量单位和物理量的框架 核心 API 支持对 Java 飞行记录进行无头分析 应用特点 用于托管各种有用 Java 工具的应用程序支持框架 一种用于可视化 Java 飞行记录内容以及内容自动分析结果的工具 JMX 控制台 ...
  • JAVA并发编程

    2018-03-06 10:42:27
    JAVA 并发编程 核心方法 框架 高洪岩 文档很贵,赚点积分
  • Java异步NIO框架Netty实现高性能高并发无标题笔记 1. 背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨 节点...
  • 实现 Java 多线程并发控制框架

    千次阅读 2019-01-08 04:46:25
    2006 年 8 月 14 日 Java 提供了语言级别的线程支持,所以在 Java 中使用多线程相对于 C,C++ 来说更简单便捷,但...在本文中,我们将讨论如何实现一个 Java 多线程的运行框架以及我们是如何来控制线程的并发同步...
  • Java并发编程全套源码

    2018-01-07 15:40:46
    Java并发编程全套源码,涵盖常见并发编程常见的所有知识点,如对象锁,Executors多任务线程框架,线程池等代码示例.
  • java并发编程艺术(带目录),内部有java并发框架的原理和相关需要注意的地方很好
  • Java并发编程Xmind思维导图,思路更清晰。内容来自《Java并发编程的艺术》,包括并发机制底层原理、Java内存模型、Java并发编程基础、锁机制、线程池、并发工具类、原子操作类、并发容器和框架。纯手打,非诚勿扰。
  • java并发编程源码 JCPCMF Java Concurrency Programming Core Method and Frameworks Maven Source Code
  • Java并发框架Quasar

    万次阅读 2016-11-30 10:46:49
    Java在高并发场景下相对Go和Erlang等语言来说具有较大劣势,原因就是Java几乎只有线程池一条路好走。假设有个场景,一个业务需要Http从第三方获取数据,平均获取时间是500ms,如果用线程池解决,假设线程最多是500,...
  • JDK1.8版本,java并发框架支持锁包括

    千次阅读 2017-03-15 21:58:13
    13、锁粗化 ,多锁变成一个,自己处理 14、轻量级锁 ,CAS实现 15、锁消除 ,偏向锁就是锁消除的一种 16、锁膨胀 , jvm实现,锁粗化 17、信号量,使用阻塞锁实现的一种策略 18、排他锁(不包含),X锁...
  • java 手写并发框架(二)异步转同步框架封装锁策略.pdf
  • java并发书籍xxxxxxx

    2018-04-07 16:24:22
    Java并发编程的艺术JAVA并发编程-核心方法与框架Java多线程编程核心技术Java多线程编程核心技术Java多线程编程核心技术Java多线程编程核心技术

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 248,743
精华内容 99,497
关键字:

java并发处理框架

java 订阅