精华内容
下载资源
问答
  • 尚硅谷NIO百度云链接

    2017-02-23 16:30:21
    尚硅谷NIO百度云链接
  • 尚硅谷java学习笔记——NIO(New IO)

    千次阅读 多人点赞 2017-03-08 19:45:59
    Java NIO(New IO或 Non Blocking IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API。NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件的读写操作。java IO 与 java...

    Java NIO(New IO或 Non Blocking IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API。NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件的读写操作。

    java IO 与 java NIO 的区别
    这里写图片描述

    一、通道(Channel)与缓冲区(Buffer)

    若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。简而言之,Channel 负责传输, Buffer 负责存储。

    1、缓冲区(Buffer)

    缓冲区(Buffer) :一个用于特定基本数据类型的容器。由 java.nio 包定义的,所有缓冲区都是 Buffer 抽象类的子类。
    Java NIO 中的 Buffer 主要用于与 NIO 通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道中的。

    这里写图片描述

    Buffer 的常用方法
    这里写图片描述

    非直接缓冲区
    这里写图片描述

    直接缓冲区
    这里写图片描述

    /*
     * 一、缓冲区(Buffer):在java NIO 中负者数据的存储。缓冲区就是数组。用于存储不同类型的数据。
     * 
     * 根据数据类型的不同(boolean 除外),有以下 Buffer 常用子类:
     * ByteBuffer
     * CharBuffer
     * ShortBuffer
     * IntBuffer
     * LongBuffer
     * FloatBuffer
     * DoubleBuffer
     * 
     * 上述缓冲区的管理方式几乎一致,通过allocate()获取缓冲区
     * 
     * 二、缓冲区存取数据的两个核心方法:
     * put():存入数据到缓冲区中
     *       put(byte b):将给定单个字节写入缓冲区的当前位置
     *       put(byte[] src):将 src 中的字节写入缓冲区的当前位置
     *       put(int index, byte b):将指定字节写入缓冲区的索引位置(不会移动 position)
     * get():获取缓存区中的数据
     *       get() :读取单个字节
     *       get(byte[] dst):批量读取多个字节到 dst 中
     *       get(int index):读取指定索引位置的字节(不会移动 position)
     *       
     * 三、缓冲区中的四个核心属性:
     * capacity:容量,表示缓冲区中最大存储数据的容量。一旦声明不能改变。
     * limit:界限,表示缓冲区中可以操作数据的大小。(limit后数据不能进行读写)
     * position:位置,表示缓冲区中正在操作数据的位置。
     * mark:标记,表示记录当前position位置。可以通过reset()恢复到mark的位置。
     * 
     * 0<=mark<=position<=limit<=capacity
     * 
     * 四、直接缓冲区与非直接缓冲区:
     * 非直接缓冲区:通过allocate()方法分配缓冲区,将缓冲区建立在JVM的内存中。
     *            
     * 直接缓冲区:通过allocateDirect()方法分配直接缓冲区,将缓冲区建立在物理内存中。可以提高效率
     *          此方法返回的 缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区 。
     *          直接缓冲区的内容可以驻留在常规的垃圾回收堆之外.
     *          将直接缓冲区主要分配给那些易受基础系统的本机 I/O 操作影响的大型、持久的缓冲区。
     *          最好仅在直接缓冲区能在程序性能方面带来明显好处时分配它们。
     *          直接字节缓冲区还可以过 通过FileChannel 的 map() 方法 将文件区域直接映射到内存中来创建 。该方法返回MappedByteBuffe
     */
    public class TestBuffer {
        @Test
        public void test1(){
            String str="abcde";
    
            //1.分配一个指定大小的缓冲区
            ByteBuffer buf=ByteBuffer.allocate(1024);
    
            System.out.println("--------------allocate()----------------");
            System.out.println(buf.position());//0
            System.out.println(buf.limit());//1024
            System.out.println(buf.capacity());//1024
    
            //2.利用put()存放数据到缓冲区中
            buf.put(str.getBytes());
    
            System.out.println("-------------put()-------------");
            System.out.println(buf.position());//5
            System.out.println(buf.limit());//1024
            System.out.println(buf.capacity());//1024
    
            //3.切换读取数据模式
            buf.flip();
            System.out.println("--------------flip()------------");
            System.out.println(buf.position());//0
            System.out.println(buf.limit());//5
            System.out.println(buf.capacity());//1024
    
            //4.利用get()读取缓冲区中的数据
            byte[] dst=new byte[buf.limit()];
            buf.get(dst);
            System.out.println(new String(dst,0,dst.length));//abcd
    
            System.out.println("--------------get()------------");
            System.out.println(buf.position());//5
            System.out.println(buf.limit());//5
            System.out.println(buf.capacity());//1024
    
            //5.rewind():可重复读
            buf.rewind();
    
            System.out.println("--------------rewind()------------");
            System.out.println(buf.position());//0
            System.out.println(buf.limit());//5
            System.out.println(buf.capacity());//1024
    
            //6.clear():清空缓冲区。但是缓冲区中的数据依然存在,但是处在“被遗忘”状态
            buf.clear();
    
            System.out.println("--------------clear()------------");
            System.out.println(buf.position());//0
            System.out.println(buf.limit());//1024
            System.out.println(buf.capacity());//1024
    
            System.out.println((char)buf.get());
        }
    
        @Test
        public void test2(){
            String str="abcde";
    
            ByteBuffer buf=ByteBuffer.allocate(1024);
    
            buf.put(str.getBytes());
    
            buf.flip();
    
            byte[] dst=new byte[buf.limit()];
            buf.get(dst,0,2);
            System.out.println(new String(dst,0,2));//ab
            System.out.println(buf.position());//2
    
            //mark():标记
            buf.mark();
    
            buf.get(dst,2,2);//再读两个位置
            System.out.println(new String(dst, 2, 2));//cd
            System.out.println(buf.position());//4
    
            //reset():恢复到mark的位置
            buf.reset();
            System.out.println(buf.position());//2
    
            //判断缓冲区中是否还有剩余数据
            if(buf.hasRemaining()){
                //获取缓冲区中可以操作的数量
                System.out.println(buf.remaining());//3
            }
        }
    
        @Test
        public void test3(){
            //分配直接缓冲区
            ByteBuffer buf=ByteBuffer.allocate(1024);
            System.out.println(buf.isDirect());//false
        }
    }

    2、通道(Channel)

    通道:由java.nio.channels包定义。
    Channel表示IO源与目标打开的连接。
    Channel类似于传统的“流”。但其自身不能直接访问数据,Channel只能与Buffer进行交互。

    这里写图片描述

    操作系统中:通道是一种通过执行通道程序管理I/O操作的控制器,它使主机(CPU和内存)与I/O操作之间达到更高的并行程度。需要进行I/O操作时,CPU只需启动通道,然后可以继续执行自身程序,通道则执行通道程序,管理与实现I/O操作。

    FileChannel 的常用方法
    这里写图片描述

    /*
     * 一、通道(Channel):用于源节点与目标节点的连接。在java NIO中负责缓冲区中数据的传输。Channel本身不存储数据,需要配合缓冲区进行传输。
     * 
     * 二、通道的主要实现类
     *    java.nio.channels.Channel 接口:
     *        |--FileChannel:用于读取、写入、映射和操作文件的通道。
     *        |--SocketChannel:通过 TCP 读写网络中的数据。
     *        |--ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。
     *        |--DatagramChannel:通过 UDP 读写网络中的数据通道。
     *        
     * 三、获取通道
     * 1.java针对支持通道的类提供了getChannel()方法
     *      本地IO:
     *      FileInputStream/FileOutputStream
     *      RandomAccessFile
     *      
     *      网络IO:
     *      Socket
     *      ServerSocket
     *      DatagramSocket
     *      
     * 2.在JDK 1.7 中的NIO.2 针对各个通道提供了静态方法 open()
     * 3.在JDK 1.7 中的NIO.2 的Files工具类的newByteChannel()
     * 
     * 四、通道之间的数据传输
     * transferFrom()
     * transferTo()
     * 
     * 五、分散(Scatter)与聚集(Gather)
     * 分散读取(Scattering Reads):将通道中的数据分散到多个缓冲区中
     * 聚集写入(Gathering Writes):将多个缓冲区中的数据聚集到通道中
     * 
     * 六、字符集:Charset
     * 编码:字符串-》字符数组
     * 解码:字符数组-》字符串
     */
    public class TestChannel {
    
        //利用通道完成文件的复制(非直接缓冲区)
        @Test
        public void test1(){
            long start=System.currentTimeMillis();
    
            FileInputStream fis=null;
            FileOutputStream fos=null;
    
            FileChannel inChannel=null;
            FileChannel outChannel=null;
            try{
                fis=new FileInputStream("d:/1.avi");
                fos=new FileOutputStream("d:/2.avi");
    
                //1.获取通道
                inChannel=fis.getChannel();
                outChannel=fos.getChannel();
    
                //2.分配指定大小的缓冲区
                ByteBuffer buf=ByteBuffer.allocate(1024);
    
                //3.将通道中的数据存入缓冲区中
                while(inChannel.read(buf)!=-1){
                    buf.flip();//切换读取数据的模式
                    //4.将缓冲区中的数据写入通道中
                    outChannel.write(buf);
                    buf.clear();//清空缓冲区
                }
            }catch(IOException e){
                e.printStackTrace();
            }finally{
                if(outChannel!=null){
                    try {
                        outChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(inChannel!=null){
                    try {
                        inChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(fos!=null){
                    try {
                        fos.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(fis!=null){
                    try {
                        fis.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            long end=System.currentTimeMillis();
            System.out.println("耗费时间:"+(end-start));//耗费时间:1094
        }
    
        //使用直接缓冲区完成文件的复制(内存映射文件)
        @Test
        public void test2() {
            long start=System.currentTimeMillis();
    
            FileChannel inChannel=null;
            FileChannel outChannel=null;
            try {
                inChannel = FileChannel.open(Paths.get("d:/1.avi"), StandardOpenOption.READ);
                outChannel=FileChannel.open(Paths.get("d:/2.avi"), StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
    
                //内存映射文件
                MappedByteBuffer inMappedBuf=inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());
                MappedByteBuffer outMappedBuf=outChannel.map(MapMode.READ_WRITE, 0, inChannel.size());
                //直接对缓冲区进行数据的读写操作
                byte[] dst=new byte[inMappedBuf.limit()];
                inMappedBuf.get(dst);
                outMappedBuf.put(dst);
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                if(outChannel!=null){
                    try {
                        outChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(inChannel!=null){
                    try {
                        inChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            long end=System.currentTimeMillis();
            System.out.println("耗费的时间为:"+(end-start));//耗费的时间为:200
        }
    
        //通道之间的数据传输(直接缓冲区)
        @Test
        public void test3(){
            long start=System.currentTimeMillis();
    
            FileChannel inChannel=null;
            FileChannel outChannel=null;
            try {
                inChannel = FileChannel.open(Paths.get("d:/1.avi"), StandardOpenOption.READ);
                outChannel=FileChannel.open(Paths.get("d:/2.avi"), StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
    
                inChannel.transferTo(0, inChannel.size(), outChannel);
                outChannel.transferFrom(inChannel, 0, inChannel.size());
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                if(outChannel!=null){
                    try {
                        outChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(inChannel!=null){
                    try {
                        inChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            long end=System.currentTimeMillis();
            System.out.println("耗费的时间为:"+(end-start));//耗费的时间为:147
        }
    
        //分散和聚集
        @Test
        public void test4(){
            RandomAccessFile raf1=null;
            FileChannel channel1=null;
            RandomAccessFile raf2=null;
            FileChannel channel2=null;
            try {
                raf1=new RandomAccessFile("1.txt","rw");
    
                //1.获取通道
                channel1=raf1.getChannel();
    
                //2.分配指定大小的缓冲区
                ByteBuffer buf1=ByteBuffer.allocate(100);
                ByteBuffer buf2=ByteBuffer.allocate(1024);
    
                //3.分散读取
                ByteBuffer[] bufs={buf1,buf2};
                channel1.read(bufs);
    
                for(ByteBuffer byteBuffer : bufs){
                    byteBuffer.flip();
                }
                System.out.println(new String(bufs[0].array(),0,bufs[0].limit()));
                System.out.println("--------------------");
                System.out.println(new String(bufs[1].array(),0,bufs[1].limit()));
    
                //4.聚集写入
                raf2=new RandomAccessFile("2.txt", "rw");
                channel2=raf2.getChannel();
    
                channel2.write(bufs);
    
            }catch (IOException e) {
                e.printStackTrace();
            }finally{
                if(channel2!=null){
                    try {
                        channel2.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(channel1!=null){
                    try {
                        channel1.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(raf2!=null){
                    try {
                        raf2.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(raf1!=null){
                    try {
                        raf1.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        //输出支持的字符集
        @Test
        public void test5(){
            Map<String,Charset> map=Charset.availableCharsets();
            Set<Entry<String,Charset>> set=map.entrySet();
    
            for(Entry<String,Charset> entry:set){
                System.out.println(entry.getKey()+"="+entry.getValue());
            }
        }
    
        //字符集
        @Test
        public void test6(){
            Charset cs1=Charset.forName("GBK");
    
            //获取编码器
            CharsetEncoder ce=cs1.newEncoder();
    
            //获取解码器
            CharsetDecoder cd=cs1.newDecoder();
    
            CharBuffer cBuf=CharBuffer.allocate(1024);
            cBuf.put("啦啦哈哈吧吧");
            cBuf.flip();
    
            //编码
            ByteBuffer bBuf=null;
            try {
                bBuf = ce.encode(cBuf);
            } catch (CharacterCodingException e) {
                e.printStackTrace();
            }
    
            for(int i=0;i<12;i++){
                System.out.println(bBuf.get());//-64-78-64-78-71-2-7-2-80-55-80-55
            }
    
            //解码
            bBuf.flip();
            CharBuffer cBuf2=null;
            try {
                cBuf2 = cd.decode(bBuf);
            } catch (CharacterCodingException e) {
                e.printStackTrace();
            }
            System.out.println(cBuf2.toString());//啦啦哈哈吧吧
        }
    }

    二、NIO 的非阻塞式网络通信

    传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。

    Java NIO 是非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。

    选择器(Selector)
    选择器(Selector) 是 SelectableChannle 对象的多路复用器,Selector 可以同时监控多个 SelectableChannel 的 IO 状况,也就是说,利用 Selector可使一个单独的线程管理多个 Channel。Selector 是非阻塞 IO 的核心。

    /*
     * 一、使用NIO 完成网络通信的三个核心:
     * 
     * 1、通道(Channel):负责连接
     *      java.nio.channels.Channel 接口:
     *           |--SelectableChannel
     *               |--SocketChannel
     *               |--ServerSocketChannel
     *               |--DatagramChannel
     *               
     *               |--Pipe.SinkChannel
     *               |--Pipe.SourceChannel
     *               
     * 2.缓冲区(Buffer):负责数据的存取
     * 
     * 3.选择器(Selector):是 SelectableChannel 的多路复用器。用于监控SelectableChannel的IO状况
     */
    public class TestBlockingNIO {//没用Selector,阻塞型的
    
        //客户端
        @Test
        public void client() throws IOException{
            SocketChannel sChannel=SocketChannel.open(new InetSocketAddress("127.0.0.1",9898));
            FileChannel inChannel=FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
            ByteBuffer buf=ByteBuffer.allocate(1024);
            while(inChannel.read(buf)!=-1){
                buf.flip();
                sChannel.write(buf);
                buf.clear();
            }
            sChannel.shutdownOutput();//关闭发送通道,表明发送完毕
    
            //接收服务端的反馈
            int len=0;
            while((len=sChannel.read(buf))!=-1){
                buf.flip();
                System.out.println(new String(buf.array(),0,len));
                buf.clear();
            }
            inChannel.close();
            sChannel.close();
        }
    
        //服务端
        @Test
        public void server() throws IOException{
            ServerSocketChannel ssChannel=ServerSocketChannel.open();
            FileChannel outChannel=FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE,StandardOpenOption.CREATE);
            ssChannel.bind(new InetSocketAddress(9898));
            SocketChannel sChannel=ssChannel.accept();
            ByteBuffer buf=ByteBuffer.allocate(1024);
            while(sChannel.read(buf)!=-1){
                buf.flip();
                outChannel.write(buf);
                buf.clear();
            }
    
            //发送反馈给客户端
            buf.put("服务端接收数据成功".getBytes());
            buf.flip();//给为读模式
            sChannel.write(buf);
    
            sChannel.close();
            outChannel.close();
            ssChannel.close();
        }
    }

    SelectionKey
    当调用 register(Selector sel, int ops) 将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数 ops 指定。
    可以监听的事件类型(用 可使用 SelectionKey 的四个常量 表示):
     读 : SelectionKey.OP_READ (1)
     写 : SelectionKey.OP_WRITE (4)
     连接 : SelectionKey.OP_CONNECT (8)
     接收 : SelectionKey.OP_ACCEPT (16)
    若注册时不止监听一个事件,则可以使用“位或”操作符连接。

    SelectionKey:表示 SelectableChannel 和 Selector 之间的注册关系。每次向选择器注册通道时就会选择一个事件(选择键)。选择键包含两个表示为整数值的操作集。操作集的每一位都表示该键的通道所支持的一类可选择操作。

    这里写图片描述

    Selector 的常用方法
    这里写图片描述

    public class TestNonBlockingNIO {
        //客户端
        @Test
        public void client()throws IOException{
            //1.获取通道
            SocketChannel sChannel=SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
            //2.切换非阻塞模式
            sChannel.configureBlocking(false);
            //3.分配指定大小的缓冲区
            ByteBuffer buf=ByteBuffer.allocate(1024);
            //4.发送数据给服务端
            Scanner scan=new Scanner(System.in);
            while(scan.hasNext()){
                String str=scan.next();
                buf.put((new Date().toString()+"\n"+str).getBytes());
                buf.flip();
                sChannel.write(buf);
                buf.clear();
            }
            //5.关闭通道
            sChannel.close();
        }
    
        //服务端
        @Test
        public void server() throws IOException{
            //1.获取通道
            ServerSocketChannel ssChannel=ServerSocketChannel.open();
    
            //2.切换非阻塞式模式
            ssChannel.configureBlocking(false);
    
            //3.绑定连接
            ssChannel.bind(new InetSocketAddress(9898));
    
            //4.获取选择器
            Selector selector=Selector.open();
    
            //5.将通道注册到选择器上,并且指定“监听接收事件”
            ssChannel.register(selector,SelectionKey.OP_ACCEPT);
    
            //6.轮询式的获取选择器上已经“准备就绪”的事件
            while(selector.select()>0){
    
                //7.获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
                Iterator<SelectionKey> it=selector.selectedKeys().iterator();
    
                while(it.hasNext()){
                    //8.获取准备“就绪”的事件
                    SelectionKey sk=it.next();
    
                    //9.判断具体是什么时间准备就绪
                    if(sk.isAcceptable()){
                        //10.若“接收就绪”,获取客户端连接
                        SocketChannel sChannel=ssChannel.accept();
    
                        //11.切换非阻塞模式
                        sChannel.configureBlocking(false);
    
                        //12.将该通道注册到选择器上
                        sChannel.register(selector, SelectionKey.OP_READ);
                    }else if(sk.isReadable()){
                        //13.获取当前选择器上“读就绪”状态的通道
                        SocketChannel sChannel=(SocketChannel)sk.channel();
                        //14.读取数据
                        ByteBuffer buf=ByteBuffer.allocate(1024);
                        int len=0;
                        while((len=sChannel.read(buf))>0){
                            buf.flip();
                            System.out.println(new String(buf.array(),0,len));
                            buf.clear();
                        }
                    }
                    //15.取消选择键SelectionKey
                    it.remove();
                }
            }
        }
    }

    DatagramChannel
    Java NIO中的DatagramChannel是一个能收发UDP包的通道。

    public class TestNonBlockNIO2 {
        @Test
        public void send() throws IOException{
            DatagramChannel dc=DatagramChannel.open();
            dc.configureBlocking(false);
            ByteBuffer buf=ByteBuffer.allocate(1024);
            Scanner scan=new Scanner(System.in);
            while(scan.hasNext()){
                String str=scan.next();
                buf.put((new Date().toString()+"\n"+str).getBytes());
                buf.flip();
                dc.send(buf, new InetSocketAddress("127.0.0.1", 9898));
                buf.clear();
            }
            dc.close();
        }
    
        @Test
        public void receive() throws IOException{
            DatagramChannel dc=DatagramChannel.open();
            dc.configureBlocking(false);
            dc.bind(new InetSocketAddress(9898));
            Selector selector=Selector.open();
            dc.register(selector, SelectionKey.OP_READ);
            while(selector.select()>0){
                Iterator<SelectionKey> it=selector.selectedKeys().iterator();
                while(it.hasNext()){
                    SelectionKey sk=it.next();
    
                    if(sk.isReadable()){
                        ByteBuffer buf=ByteBuffer.allocate(1024);
                        dc.receive(buf)
    ;
                        buf.flip();
                        System.out.println(new String(buf.array(),0,buf.limit()));
                        buf.clear();
                    }
                }
                it.remove();
            }
        }
    }

    管道 (Pipe)
    Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。

    这里写图片描述

    public class TestPipe {
        @Test
        public void test1()throws IOException{
            //1.获取管道
            Pipe pipe=Pipe.open();
            //2.将缓冲区中的数据写入管道
            ByteBuffer buf=ByteBuffer.allocate(1024);
            Pipe.SinkChannel sinkChannel=pipe.sink();
            buf.put("通过单向管道发送数据".getBytes());
            buf.flip();
            sinkChannel.write(buf);
    
            //3.读取缓冲区中的数据
            Pipe.SourceChannel sourceChannel=pipe.source();
            buf.flip();
            int len=sourceChannel.read(buf);
            System.out.println(new String(buf.array(),0,len));
    
            sourceChannel.close();
            sinkChannel.close();
        }
    }

    三、NIO.2 – Path 、Paths 、Files

    Path 与 Paths

    • java.nio.file.Path 接口代表一个平台无关的平台路径,描述了目录结构中文件的位置。
    • Paths 提供的 get() 方法用来获取 Path 对象:Path get(String first, String … more) : 用于将多个字符串串连成路径。
    • Path 常用方法:
      • boolean endsWith(String path) : 判断是否以 path 路径结束
      • boolean startsWith(String path) : 判断是否以 path 路径开始
      • boolean isAbsolute() : 判断是否是绝对路径
      • Path getFileName() : 返回与调用 Path 对象关联的文件名
      • Path getName(int idx) : 返回的指定索引位置 idx 的路径名称
      • int getNameCount() : 返回Path 根目录后面元素的数量
      • Path getParent() :返回Path对象包含整个路径,不包含Path 对象指定的文件路径
      • Path getRoot() :返回调用 Path 对象的根路径
      • Path resolve(Path p) :将相对路径解析为绝对路径
      • Path toAbsolutePath() : 作为绝对路径返回调用 Path 对象
      • String toString() : 返回调用 Path 对象的字符串表示形式

    Files 类
    java.nio.file.Files 用于操作文件或目录的工具类。

    • Files常用方法:
      • Path copy(Path src, Path dest, CopyOption … how) : 文件的复制
      • Path createDirectory(Path path, FileAttribute< ? > … attr) : 创建一个目录
      • Path createFile(Path path, FileAttribute< ? > … arr) : 创建一个文件
      • void delete(Path path) : 删除一个文件
      • Path move(Path src, Path dest, CopyOption…how) : 将 src 移动到 dest 位置
      • long size(Path path) : 返回 path 指定文件的大小
    • Files常用方法:用于判断

      • boolean exists(Path path, LinkOption … opts) : 判断文件是否存在
      • boolean isDirectory(Path path, LinkOption … opts) : 判断是否是目录
      • boolean isExecutable(Path path) : 判断是否是可执行文件
      • boolean isHidden(Path path) : 判断是否是隐藏文件
      • boolean isReadable(Path path) : 判断文件是否可读
      • boolean isWritable(Path path) : 判断文件是否可写
      • boolean notExists(Path path, LinkOption … opts) : 判断文件是否不存在
      • public static < A extends BasicFileAttributes> A readAttributes(Path path,Class< A > type,LinkOption…options) : 获取与 path 指定的文件相关联的属性。
    • Files常用方法:用于操作内容

      • SeekableByteChannel newByteChannel(Path path, OpenOption…how) : 获取与指定文件的连接,how 指定打开方式。
      • DirectoryStream newDirectoryStream(Path path) : 打开 path 指定的目录
      • InputStream newInputStream(Path path, OpenOption…how):获取 InputStream 对象
      • OutputStream newOutputStream(Path path, OpenOption…how) : 获取 OutputStream 对象
    展开全文
  • 尚硅谷NIO百度云连接

    2017-02-15 17:54:10
    尚硅谷NIO百度云连接
  • 1、java NIO概述 Java NIO(New IO 或 Non Blocking IO)是从 Java 1.4 版本开始引入的一个新的 IO API,可以替代标准的 Java IO API。NIO 支持面向缓冲区的、基于通道的 IO 操 作。NIO 将以更加高效的方式进行文件...

    1、java NIO概述

    Java NIO(New IO 或 Non Blocking IO)是从 Java 1.4 版本开始引入的一个新的IO API,可以替代标准的 Java IO API。
    NIO 支持面向缓冲区的、基于通道的 IO 操作。NIO 将以更加高效的方式进行文件的读写操作。

    1.1阻塞IO

    通常在进行同步 I/O 操作时,如果读取数据,代码会阻塞直至有可供读取的数据。同样,写入调用将会阻塞直至数据能够写入。
    传统的 Server/Client 模式会基于 TPR(Thread per Request),服务器会为每个客户端请求建立一个线程,由该线程单独负责处理一个客户请求。
    这种模式带来的一个问题就是线程数量的剧增,大量的线程会增大服务器的开销。大多数的实现为了避免这个问题,都采用了线程池模型,并设置线程池线程的最大数量,这由带来了新的问题,如果线程池中有 100 个线程,而有100 个用户都在进行大文件下载,会导致第 101 个用户的请求无法及时处理,即便第101 个用户只想请求一个几 KB 大小的页面。

    传统的 Server/Client 模式如下图所示:
    在这里插入图片描述

    1.2非阻塞 IO(NIO)

    NIO 中非阻塞 I/O 采用了基于 Reactor 模式的工作方式,I/O 调用不会被阻塞,相反是注册感兴趣的特定 I/O 事件,如可读数据到达,新的套接字连接等等,在发生特定事件时,系统再通知我们。NIO 中实现非阻塞 I/O 的核心对象就是 Selector,Selector 就是注册各种 I/O 事件地方,而且当我们感兴趣的事件发生时,就是这个对象告诉我们所发生的事件,如下图所示:
    在这里插入图片描述
    从图中可以看出,当有读或写等任何注册的事件发生时,可以从 Selector 中获得相应的 SelectionKey,同时从 SelectionKey 中可以找到发生的事件和该事件所发生的具体的 SelectableChannel,以获得客户端发送过来的数据。
    非阻塞指的是 IO 事件本身不阻塞,但是获取 IO 事件的 select()方法是需要阻塞等待的.区别是阻塞的 IO 会阻塞在 IO 操作上, NIO 阻塞在事件获取上,没有事件就没有 IO, 从高层次看 IO 就不阻塞了.也就是说只有 IO 已经发生那么我们才评估 IO 是否阻塞,但是select()阻塞的时候 IO 还没有发生,何谈 IO 的阻塞呢?NIO 的本质是延迟 IO 操作到真正发生 IO 的时候,而不是以前的只要 IO 流打开了就一直等待 IO 操作。
    在这里插入图片描述

    1.3 NIO 概述

    Java NIO 由以下几个核心部分组成:

    • Channels
    • Buffers
    • Selectors
      虽然 Java NIO 中除此之外还有很多类和组件,但 Channel,Buffer 和 Selector 构成了核心的 API。其它组件,如 Pipe 和 FileLock,只不过是与三个核心组件共同使用的工具类。

    1.3.1 Channel

    首先说一下 Channel,可以翻译成“通道”。Channel 和 IO 中的 Stream(流)是差不多一个等级的。只不过 Stream 是单向的,譬如:InputStream,OutputStream.而Channel 是双向的,既可以用来进行读操作,又可以用来进行写操作。NIO 中的 Channel 的主要实现有:FileChannel、DatagramChannel、SocketChannel 和 ServerSocketChannel,这里看名字就可以猜出个所以然来:分别可以对应文件 IO、UDP 和 TCP(Server 和 Client)。

    1.3.2 Buffer

    NIO 中的关键 Buffer 实现有:ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer,分别对应基本数据类型: byte, char, double, float, int, long, short。

    1.3.3 Selector

    Selector 运行单线程处理多个 Channel,如果你的应用打开了多个通道,但每个连接的流量都很低,使用 Selector 就会很方便。例如在一个聊天服务器中。要使用Selector, 得向 Selector 注册 Channel,然后调用它的 select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新的连接进来、数据接收等。

    2、Java NIO(Channel)

    2.1 Channel 概述

    Channel 是一个通道,可以通过它读取和写入数据,它就像水管一样,网络数据通过Channel 读取和写入。通道与流的不同之处在于通道是双向的,流只是在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),而且通道可以用于读、写或者同时用于读写。因为 Channel 是全双工的,所以它可以比流更好地映射底层操作系统的 API。NIO 中通过 channel 封装了对数据源的操作,通过 channel 我们可以操作数据源,但又不必关心数据源的具体物理结构。这个数据源可能是多种的。比如,可以是文件,也可以是网络 socket。在大多数应用中,channel 与文件描述符或者 socket 是一一对应的。Channel 用于在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。
    Channel 源码

    public interface Channel extends Closeable {
    
        /**
         * Tells whether or not this channel is open.
         *
         * @return <tt>true</tt> if, and only if, this channel is open
         */
        public boolean isOpen();
    
        /**
         * Closes this channel.
         * @throws  IOException  If an I/O error occurs
         */
        public void close() throws IOException;
    
    }
    

    Channel 是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流。所有数据都通过 Buffer 对象来处理。您永远不会将字节直接写入通道中,相反,您是将数据写入包含一个或者多个字节的缓冲区。同样,您不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。
    Java NIO 的通道类似流,但又有些不同:

    • 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
    • 通道可以异步地读写。
    • 通道中的数据总是要先读到一个 Buffer,或者总是要从一个 Buffer 中写入。
      正如上面所说,从通道读取数据到缓冲区,从缓冲区写入数据到通道。

    2.2Channel实现

    下面是 Java NIO 中最重要的 Channel 的实现:

    • FileChannel
    • DatagramChannel
    • SocketChannel
    • ServerSocketChannel
      (1) FileChannel 从文件中读写数据。
      (2) DatagramChannel 能通过 UDP 读写网络中的数据。
      (3) SocketChannel 能通过 TCP 读写网络中的数据。
      (4) ServerSocketChannel 可以监听新进来的 TCP 连接,像 Web 服务器那样。对每一个新进来的连接都会创建一个 SocketChannel。
      正如你所看到的,这些通道涵盖了 UDP 和 TCP 网络 IO,以及文件 IO

    2.4FileChannel详解

    2.4.1从FileChannel读数据样例代码

    import java.io.RandomAccessFile;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    
    /**
     * @author lyz
     * @Title: FileChannelDemo
     * @Description:
     * @date 2021/10/14 9:51
     */
    public class FileChannelDemo {
        public static void main(String[] args) throws Exception {
            //打开FileChannel
            RandomAccessFile accessFile = new RandomAccessFile("D:\\testfile\\1.txt", "rw");
            FileChannel channel = accessFile.getChannel();
            //创建一个Buffer对象
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //从channel中读数据到buffer
            int read = channel.read(buffer);
            while (read != -1){
                //反转读写模式
                buffer.flip();
                System.out.println(new String(buffer.array()));
                buffer.clear();
                read = channel.read(buffer);
            }
            System.out.println("读完了");
            accessFile.close();
        }
    }
    

    2.4.2向FileChannel写数据

    import java.io.RandomAccessFile;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    
    /**
     * @author lyz
     * @Title: FileChannelDemo2
     * @Description:
     * @date 2021/10/14 10:30
     */
    public class FileChannelDemo2 {
        public static void main(String[] args) throws Exception {
            //打开FileChannel
            RandomAccessFile accessFile = new RandomAccessFile("D:\\testfile\\1.txt", "rw");
            FileChannel channel = accessFile.getChannel();
            //创建buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //向buffer中放入数据
            buffer.put("let's show them who we are".getBytes());
            //反转读写模式
            buffer.flip();
            //将buffer中的数据写入文件
            channel.write(buffer);
            accessFile.close();
            System.out.println("写完了");
        }
    }
    

    注意
    buffer读写时要反转读写模式
    buffer.flip();
    用完 FileChannel 后必须将其关闭。如:
    inChannel.close();

    2.4.3 FileChannel 的 position 方法

    有时可能需要在 FileChannel 的某个特定位置进行数据的读/写操作。可以通过调用position()方法获取 FileChannel 的当前位置。也可以通过调用 position(long pos)方法设置 FileChannel 的当前位置。这里有两个例子:
    long pos = channel.position(); channel.position(pos +123);
    如果将位置设置在文件结束符之后,然后试图从文件通道中读取数据,读方法将返回1 (文件结束标志)。
    如果将位置设置在文件结束符之后,然后向通道中写数据,文件将撑大到当前位置并写入数据。这可能导致“文件空洞”,磁盘上物理文件中写入的数据间有空隙。

    2.4.4 FileChannel 的 size 方法

    FileChannel 实例的 size()方法将返回该实例所关联文件的大小。如:
    long fileSize = channel.size();

    2.4.5 FileChannel 的 truncate 方法

    可以使用 FileChannel.truncate()方法截取一个文件。截取文件时,文件将中指定长度后面的部分将被删除。如:
    channel.truncate(1024);
    这个例子截取文件的前 1024 个字节。

    2.4.6 FileChannel 的 force 方法

    FileChannel.force()方法将通道里尚未写入磁盘的数据强制写到磁盘上。出于性能方面的考虑,操作系统会将数据缓存在内存中,所以无法保证写入到 FileChannel 里的数据一定会即时写到磁盘上。要保证这一点,需要调用 force()方法。
    force()方法有一个 boolean 类型的参数,指明是否同时将文件元数据(权限信息等)写到磁盘上。

    2.4.7 FileChannel 的 transferTo 和 transferFrom 方法

    通道之间的数据传输:
    如果两个通道中有一个是 FileChannel,那你可以直接将数据从一个 channel 传输到另外一个 channel。
    transferFrom测试代码

    import java.io.RandomAccessFile;
    import java.nio.channels.FileChannel;
    
    /**
     * @author lyz
     * @Title: TransferFromDemo
     * @Description:
     * @date 2021/10/14 11:02
     */
    public class TransferFromDemo {
        public static void main(String[] args) throws Exception {
            //创建源FileChannel和目的FileChannel
            RandomAccessFile fromFile = new RandomAccessFile("D:\\testfile\\一周统计.txt", "rw");
            FileChannel fromChannel = fromFile.getChannel();
            RandomAccessFile toFile = new RandomAccessFile("D:\\testfile\\1.txt", "rw");
            FileChannel toChannel = toFile.getChannel();
    
            //fromChannel传输到toChannel
            toChannel.transferFrom(fromChannel, 0, fromChannel.size());
    
            fromChannel.close();
            toChannel.close();
            System.out.println("over");
        }
    }
    

    transferTo测试代码

    import java.io.RandomAccessFile;
    import java.nio.channels.FileChannel;
    
    /**
     * @author lyz
     * @Title: TransferToDemo
     * @Description:
     * @date 2021/10/14 11:02
     */
    public class TransferToDemo {
        public static void main(String[] args) throws Exception {
            //创建源FileChannel和目的FileChannel
            RandomAccessFile fromFile = new RandomAccessFile("D:\\testfile\\一周统计.txt", "rw");
            FileChannel fromChannel = fromFile.getChannel();
            RandomAccessFile toFile = new RandomAccessFile("D:\\testfile\\2.txt", "rw");
            FileChannel toChannel = toFile.getChannel();
    
            //fromChannel传输到toChannel
            fromChannel.transferTo(0, fromChannel.size(), toChannel);
    
            fromChannel.close();
            toChannel.close();
            System.out.println("over");
        }
    }
    

    2.5Socket 通道

    (1)	新的 socket 通道类可以运行非阻塞模式并且是可选择的,可以激活大程序(如网络服务器和中间件组件)巨大的可伸缩性和灵活性。本节中我们会看到,再也没有为每个 socket 连接使用一个线程的必要了,也避免了管理大量线程所需的上下文交换开销。借助新的 NIO 类,一个或几个线程就可以管理成百上千的活动 socket 连接了并且只有很少甚至可能没有性能损失。所有的 socket 通道类(DatagramChannel、
    SocketChannel 和 ServerSocketChannel)都继承了位于 java.nio.channels.spi 包中的 AbstractSelectableChannel。这意味着我们可以用一个 Selector 对象来执行 socket 通道的就绪选择(readiness selection)。 
    (2)	请注意 DatagramChannel 和 SocketChannel 实现定义读和写功能的接口而 ServerSocketChannel 不实现。ServerSocketChannel 负责监听传入的连接和创建新的 SocketChannel 对象,它本身从不传输数据。 
    (3)	在我们具体讨论每一种 socket 通道前,您应该了解 socket 和 socket 通道之间的关系。通道是一个连接 I/O 服务导管并提供与该服务交互的方法。就某个 socket 而言,它不会再次实现与之对应的 socket 通道类中的 socket 协议 API,而 java.net 中已经存在的 socket 通道都可以被大多数协议操作重复使用。 
    全部 socket 通道类(DatagramChannel、SocketChannel 和
    ServerSocketChannel)在被实例化时都会创建一个对等 socket 对象。这些是我们所
    熟悉的来自 java.net 的类(Socket、ServerSocket 和 DatagramSocket),它们已经被更新以识别通道。对等 socket 可以通过调用 socket( )方法从一个通道上获取。
    此外,这三个 java.net 类现在都有 getChannel( )方法。 
    (4)	要把一个 socket 通道置于非阻塞模式,我们要依靠所有 socket 通道类的公有超级类:SelectableChannel。就绪选择(readiness selection)是一种可以用来查询通道的机制,该查询可以判断通道是否准备好执行一个目标操作,如读或写。非阻
    塞 I/O 和可选择性是紧密相连的,那也正是管理阻塞模式的 API 代码要在
    SelectableChannel 超级类中定义的原因。 
    

    设置或重新设置一个通道的阻塞模式是很简单的,只要调用 configureBlocking( )方法即可,传递参数值为 true 则设为阻塞模式,参数值为 false 值设为非阻塞模式。可以通过调用 isBlocking( )方法来判断某个 socket 通道当前处于哪种模式。

    /**
         * Adjusts this channel's blocking mode.
         */
        public final SelectableChannel configureBlocking(boolean block)
            throws IOException
        {
            synchronized (regLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                if (blocking == block)
                    return this;
                if (block && haveValidKeys())
                    throw new IllegalBlockingModeException();
                implConfigureBlocking(block);
                blocking = block;
            }
            return this;
        }
    

    非阻塞 socket 通常被认为是服务端使用的,因为它们使同时管理很多 socket 通道变得更容易。但是,在客户端使用一个或几个非阻塞模式的 socket 通道也是有益处的,例如,借助非阻塞 socket 通道,GUI 程序可以专注于用户请求并且同时维护与一个或多个服务器的会话。在很多程序上,非阻塞模式都是有用的。
    偶尔地,我们也会需要防止 socket 通道的阻塞模式被更改。API 中有一个blockingLock( )方法,该方法会返回一个非透明的对象引用。返回的对象是通道实现修改阻塞模式时内部使用的。只有拥有此对象的锁的线程才能更改通道的阻塞模式。
    下面分别介绍这 3 个通道

    2.5.1ServerSocketChannel

    ServerSocketChannel 是一个基于通道的 socket 监听器。它同我们所熟悉的
    java.net.ServerSocket 执行相同的任务,不过它增加了通道语义,因此能够在非阻塞模式下运行。 
    由于 ServerSocketChannel 没有 bind()方法,因此有必要取出对等的 socket 并使用它来绑定到一个端口以开始监听连接。我们也是使用对等 ServerSocket 的 API 来根据需要设置其他的 socket 选项。 
    同 java.net.ServerSocket 一样,ServerSocketChannel 也有 accept( )方法。一旦创建了一个 ServerSocketChannel 并用对等 socket 绑定了它,然后您就可以在其中一个上调用 accept()。如果您选择在 ServerSocket 上调用 accept( )方法,那么它会同任何其他的 ServerSocket 表现一样的行为:总是阻塞并返回一个 java.net.Socket 对
    象。如果您选择在 ServerSocketChannel 上调用 accept( )方法则会返回
    SocketChannel 类型的对象,返回的对象能够在非阻塞模式下运行。 
    换句话说: 
    ServerSocketChannel 的 accept()方法会返回 SocketChannel 类型对象,
    SocketChannel 可以在非阻塞模式下运行。 
    其它 Socket 的 accept()方法会阻塞返回一个 Socket 对象。如果
    ServerSocketChannel 以非阻塞模式被调用,当没有传入连接在等待时,
    ServerSocketChannel.accept( )会立即返回 null。正是这种检查连接而不阻塞的能力实现了可伸缩性并降低了复杂性。可选择性也因此得到实现。我们可以使用一个选择器实例来注册 ServerSocketChannel 对象以实现新连接到达时自动通知的功能。
    

    代码演示

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author lyz
     * @Title: ServerSocketChannelDemo
     * @Description:
     * @date 2021/10/14 12:47
     */
    public class ServerSocketChannelDemo {
    
        public static void main(String[] args) throws IOException, InterruptedException {
            //将字节数组放入ByteBuffer
            ByteBuffer buffer = ByteBuffer.wrap("this is server socket channel demo.".getBytes());
            //ServerSocketChannel监听8888端口
            ServerSocketChannel socketChannel = ServerSocketChannel.open();
            socketChannel.socket().bind(new InetSocketAddress(8888));
            //设置模式为非阻塞
            socketChannel.configureBlocking(false);
            while (true){
                System.out.println("wait for connections");
                SocketChannel sc = socketChannel.accept();
                if (sc == null){//没有请求进入
                    System.out.println("null");
                    TimeUnit.SECONDS.sleep(2);
                } else {
                    System.out.println("incoming request from address:" + sc.socket().getRemoteSocketAddress());
                    buffer.rewind();//指针0
                    sc.write(buffer);
                    sc.close();
                }
            }
        }
    }
    

    在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是 null。

    2.5.2SocketChannel

    Java NIO 中的 SocketChannel 是一个连接到 TCP 网络套接字的通道。
    A selectable channel for stream-oriented connecting sockets.
    以上是 Java docs 中对于 SocketChannel 的描述:SocketChannel 是一种面向流连接 sockets 套接字的可选择通道。从这里可以看出:
    • SocketChannel 是用来连接 Socket 套接字
    • SocketChannel 主要用途用来处理网络 I/O 的通道
    • SocketChannel 是基于 TCP 连接传输
    • SocketChannel 实现了可选择通道,可以被多路复用的
    SocketChannel 特征:

    (1)	对于已经存在的 socket 不能创建 SocketChannel 
    (2)	SocketChannel 中提供的 open 接口创建的 Channel 并没有进行网络级联,需要使用 connect 接口连接到指定地址 
    (3)	未进行连接的 SocketChannle 执行 I/O 操作时,会抛出NotYetConnectedException 
    (4)	SocketChannel 支持两种 I/O 模式:阻塞式和非阻塞式 
    (5)	SocketChannel 支持异步关闭。如果 SocketChannel 在一个线程上 read 阻塞,另一个线程对该 SocketChannel 调用 shutdownInput,则读阻塞的线程将返回-1 表示没有读取任何数据;如果 SocketChannel 在一个线程上 write 阻塞,另一个线程对该
    SocketChannel 调用 shutdownWrite,则写阻塞的线程将抛出AsynchronousCloseException 
    (6)	SocketChannel 支持设定参数 
    SO_SNDBUF 套接字发送缓冲区大小 
    SO_RCVBUF 套接字接收缓冲区大小 
    SO_KEEPALIVE 	保活连接 
    O_REUSEADDR 	复用地址 
    SO_LINGER 有数据传输时延缓关闭 Channel (只有在非阻塞模式下有用) 
    TCP_NODELAY 	禁用 Nagle 算法 
    

    代码演示

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    
    /**
     * @author lyz
     * @Title: SocketChannelDemo
     * @Description:
     * @date 2021/10/14 13:59
     */
    public class SocketChannelDemo {
    
        public static void main(String[] args) throws IOException {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("www.baidu.com", 80));
            socketChannel.configureBlocking(false);
    
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            socketChannel.read(buffer);
            socketChannel.close();
            System.out.println("read over");
        }
    }
    

    .5.3 DatagramChannel

    正如 SocketChannel 对应 Socket,ServerSocketChannel 对应 ServerSocket,每一个 DatagramChannel 对象也有一个关联的 DatagramSocket 对象。正如 SocketChannel 模拟连接导向的流协议(如 TCP/IP),DatagramChannel 则模拟包导向的无连接协议(如 UDP/IP)。DatagramChannel 是无连接的,每个数据报(datagram)都是一个自包含的实体,拥有它自己的目的地址及不依赖其他数据报的数据负载。与面向流的的 socket 不同,DatagramChannel 可以发送单独的数据报给不同的目的地址。同样,DatagramChannel 对象也可以接收来自任意地址的数据包。每个到达的数据报都含有关于它来自何处的信息(源地址)
    样例代码:

    import org.junit.Test;
    
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.DatagramChannel;
    import java.nio.charset.Charset;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author lyz
     * @Title: DatagramChannelDemo
     * @Description:
     * @date 2021/10/14 14:39
     */
    public class DatagramChannelDemo {
    
        @Test
        public void sendMessage() throws Exception {
            DatagramChannel datagramChannel = DatagramChannel.open();
            datagramChannel.socket().bind(new InetSocketAddress("127.0.0.1", 9999));
    
            while (true){
                String message = "发送消息" + new Date();
                ByteBuffer buffer = ByteBuffer.wrap(message.getBytes("utf-8"));
                datagramChannel.send(buffer, new InetSocketAddress("127.0.0.1", 9998));
                System.out.println("消息已发送");
                TimeUnit.SECONDS.sleep(1);
            }
        }
    
        @Test
        public void receiveMessage() throws Exception {
            DatagramChannel receiveChannel = DatagramChannel.open();
            receiveChannel.socket().bind(new InetSocketAddress("127.0.0.1", 9998));
    
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while (true){
                buffer.clear();
                receiveChannel.receive(buffer);
                buffer.flip();
                System.out.println(Charset.forName("utf-8").decode(buffer));
            }
        }
        
    	@Test
        public void connect() throws Exception {
            DatagramChannel datagramChannel = DatagramChannel.open();
            datagramChannel.bind(new InetSocketAddress("127.0.0.1", 9998));
            datagramChannel.connect(new InetSocketAddress("127.0.0.1", 9998));
    
            String message = "发送消息" + new Date();
            datagramChannel.write(ByteBuffer.wrap(message.getBytes("utf-8")));
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while (true) {
                buffer.clear();
                datagramChannel.read(buffer);
                buffer.flip();
                System.out.println(Charset.forName("utf-8").decode(buffer));
            }
        }
    }
    
    展开全文
  • 24 尚硅谷Java NIO视频

    2019-08-07 16:47:29
    教程视频:NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中
  • 尚硅谷JUC笔记

    2021-01-07 15:15:43
    ) Java IO 和 NIO 2、JUC是什么? 简单来说JUC就是java.util.concurrent并发包 3、回顾进程和线程 1).进程和线程是什么? 答:进程是计算机分配资源的基本单位,它是一个具有独立功能的程序,例如QQ运行起来就是一...

    JUC

    第一集:JUC介绍

    1、面试常见问题类别

    • 面向对象、高级语法(1、抽象类和接口有什么区别?2、Java中获得多线程有几种方法?)
    • Java集合类(1、java.util.*中的ArrayList、Vector、LinkedList、TreeMap、HashSet、HashMap、ConcurrentHashMap、CopyOnWriteHashMap、Queue等?2、谈谈你对HashMap的理解,什么是Hash算法,什么是Hash碰撞?)
    • Java多线程(1、Java中获得多线程的方法有几种?)
    • Java IO 和 NIO

    2、JUC是什么?

    在这里插入图片描述

    简单来说JUC就是java.util.concurrent并发包

    3、回顾进程和线程

    1).进程和线程是什么?

    答:进程是计算机分配资源的基本单位,它是一个具有独立功能的程序,例如QQ运行起来就是一个大的进程,这可以在任务管理器中看到;线程是程序执行的最小单位,一个进程里面可以有很多个线程,这些线程共享进程中的资源。

    2).进程和线程举例?

    答:例如QQ运行起来就是一个大的进程,然后里面有天气预报,我们可以同时进行文字聊天和视频,这些都是小的线程

    3).线程状态?

    答: JVM中的线程状态

    4).wait和sleep的区别?

    答:wait睡眠的时候会放开手中的锁,而sleep睡眠的时候会带着手中的锁,wait和sleep都可以设置睡眠时间,那他们的线程进入的是TIMED_WAITING状态,如果wait不设置睡眠时间,那需要使用其他方法使用共享对象.notify()或者共享对象.notifyAll()方法唤醒,并且进入的是WAITING状态

    5).并发和并行分别是什么?

    答:并发:同一时刻多个线程访问同一个资源,例如:例子:小米9限量抢购、春运抢票、电商秒杀…
    并行:同一时刻多个线程同时访问不同资源

    第二集:卖票复习

    1、多线程编程的企业级套路+模板

    1).最重要的三句话

    1. 在高内聚低耦合的前提下,线程 操作(资源类中对外暴露的方法) 资源类
    2. 先判断,然后干活,最后通知
    3. 在多线程交互中,必须要防止多线程的虚假唤醒,也即判断只能用while,不能用if

    2).第一句话的例子

    题目:

    三个售票员 卖出 30张票

    使用synchronized编写代码:

    // 资源类
    class Ticket {
        // 共享资源
        private int number = 30;
        
        // 操作
        public synchronized void saleTicket() {
            if (number > 0) {
                System.out.println(Thread.currentThread().getName() + "卖出了第" + number-- + "张票");
            }
        }
    }
    
    public class Test {
        public static void main(String[] args) {
            // 创建资源类
            Ticket ticket = new Ticket();
            // 创建线程
            new Thread(() -> {for (int i = 0; i < 40; i++) ticket.saleTicket();}, "A").start();
            new Thread(() -> {for (int i = 0; i < 40; i++) ticket.saleTicket();}, "B").start();
            new Thread(() -> {for (int i = 0; i < 40; i++) ticket.saleTicket();}, "C").start();
        }
    }
    

    使用synchronized编写代码的解释:

    由于结果太长,所以就不给大家看了,我们创建了Ticket资源类,里面放着共享资源number,并且资源类中包含着操作saleTicket(),然后我们在main()方法中创建资源类对象,然后创建线程A、B、C同时去利用Ticket资源类对象调用saleTicket(),当然了saleTicket()方法上加的有synchronized关键字,由于A、B、C线程操作的都是同一个Ticket资源类对象,所以synchronized关键字锁住的就是该对象,因此不会出现多卖票的问题

    使用Lock锁编写代码:

    // 资源类
    class Ticket {
        // 共享资源
        private int number = 30;
        // 创建锁
        private final Lock lock = new ReentrantLock();
    
        // 操作
        public void saleTicket() {
            // 加锁
            lock.lock();
            try {
                // 业务代码
                if (number > 0) {
                    System.out.println(Thread.currentThread().getName() + "卖出了第" + number-- + "张票");
                }
            } finally {
                // 解锁
                lock.unlock();
            }
        }
    }
    
    public class Test {
        public static void main(String[] args) {
            // 创建资源类
            Ticket ticket = new Ticket();
            // 创建线程
            new Thread(() -> {for (int i = 0; i < 40; i++) ticket.saleTicket();}, "A").start();
            new Thread(() -> {for (int i = 0; i < 40; i++) ticket.saleTicket();}, "B").start();
            new Thread(() -> {for (int i = 0; i < 40; i++) ticket.saleTicket();}, "C").start();
        }
    }
    

    使用Lock锁编写代码的解释:

    加锁之后锁的不是某个对象,而是锁的这段代码,也就是说同一时刻有且只能有一个线程来执行加锁的这段代码,不过记着finally中解锁,使用finally的目的是为了保证在业务代码发生异常的时候依然可以解锁成功。其实这种做法和synchronized(this){***}还是有很多相似之处的,但是synchronized同步锁还是需要锁着线程共享的东西,比如前面写的this代表共享对象ticket,只要锁着的是多个线程共享的东西就可以了

    3).第二句话、第三句话的例子

    题目:

    现在有四个线程去操作一个初始值为0的变量,要求实现其中两个线程对该变量加1,另外两个线程对该变量减1,要求变量的值只能是0和1,并且要进行交替展示,每个线程来10轮操作

    使用synchronized、wait、notifyAll来解决问题的代码:

    // 资源类
    class Resource {
        private int number = 0;
        // 加1操作
        @SneakyThrows // 不用抛出异常
        public synchronized void increase() {
            // 判断
            while (number == 1) {
                this.wait();
            }
            // 干活
            number++;
            System.out.println(Thread.currentThread().getName() + ":" + number);
            // 通知
            this.notifyAll();
        }
        // 减1操作
        @SneakyThrows
        public synchronized void decrease() {
            // 判断
            while (number == 0) {
                this.wait();
            }
            // 干活
            number--;
            System.out.println(Thread.currentThread().getName() + ":" + number);
            // 通知
            this.notifyAll();
        }
    }
    
    public class Test {
        public static void main(String[] args) {
            // 创建资源类对象
            Resource resource = new Resource();
            // A线程和B线程加
            new Thread(() -> {for (int i = 0; i < 10; i++)resource.increase();}, "A").start();
            new Thread(() -> {for (int i = 0; i < 10; i++)resource.increase();}, "B").start();
            // C线程和D线程加
            new Thread(() -> {for (int i = 0; i < 10; i++)resource.decrease();}, "C").start();
            new Thread(() -> {for (int i = 0; i < 10; i++)resource.decrease();}, "D").start();
        }
    }
    

    使用synchronized、wait、notifyAll来解决问题的代码解释:

    4个线程中两个加1两个减1,那肯定要在加1操作方法和减1操作方法中判断是否要进行加1或者减1,如果不加1或者减1的话那就要把同步锁让出来给其他线程,因此我们需要判断是否应该执行这个操作,如果不符合条件的话那就需要使用wait()方法睡眠,如果符合条件那就干活,干完活后唤醒其他线程,其他线程在开始争抢cpu时间片,至于为什么使用while而不用if判断呢?我来给你解释,现在假设A线程执行第一次判断发现可以加,那number就变成了1,之后唤醒其他线程,假设当A线程释放锁之后B线程抢到了锁,然后他一判断不能做蛋糕,那就执行wait()方法,假设当B线程释放锁之后A线程抢到了锁,然后他一判断还是不能做蛋糕,那就执行wait()方法,假设A线程释放锁之后C线程获得了锁,她属于顾客,然后她取走了一个蛋糕,那number就变成了0,之后唤醒其他线程,假设当C线程释放锁之后A线程获得了锁,然后A线程生产蛋糕,number就变成了1,之后唤醒其他线程,假设当A线程释放锁之后B线程抢到了锁,如果使用的是if判断,由于之前B线程执行了wait()方法,那它会从wait()方法之后执行,那么B先生会继续做蛋糕,这就违背了题目要求,所以不能使用if,而是应该使用while,假设还是上面的情况,即当A线程释放锁之后B线程抢到了锁,如果使用的是while判断,那就会在经过一次判断,一经判断发现number是1,那就不能做蛋糕,还是只能执行wait()方法,所以使用while判断可以完美解决虚假唤醒问题

    使用Lock、await、signalAll来解决问题的代码:

    // 资源类
    class Resource {
        private int number = 0;
        private final Lock lock = new ReentrantLock();
        private final Condition condition = lock.newCondition();
    
        // 加1操作
        @SneakyThrows // 不用抛出异常
        public void increase() {
            // 加锁
            lock.lock();
            try {
                // 判断
                while (number == 1) {
                    condition.await();
                }
                // 干活
                number++;
                System.out.println(Thread.currentThread().getName() + ":" + number);
                // 通知
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    
        // 减1操作
        @SneakyThrows
        public void decrease() {
            // 加锁
            lock.lock();
            try {
                // 判断
                while (number == 0) {
                    condition.await();
                }
                // 干活
                number--;
                System.out.println(Thread.currentThread().getName() + ":" + number);
                // 通知
                condition.signalAll();
            } finally {
                lock.unlock();
    
            }
        }
    }
    
    public class Test {
        public static void main(String[] args) {
            // 创建资源类对象
            Resource resource = new Resource();
            // A线程和B线程加
            new Thread(() -> {for (int i = 0; i < 10; i++)resource.increase();}, "A").start();
            new Thread(() -> {for (int i = 0; i < 10; i++)resource.increase();}, "B").start();
            // C线程和D线程加
            new Thread(() -> {for (int i = 0; i < 10; i++)resource.decrease();}, "C").start();
            new Thread(() -> {for (int i = 0; i < 10; i++)resource.decrease();}, "D").start();
        }
    }
    

    使用Lock、await、signalAll来解决问题的代码的解释:

    之前用wait()和notifyAll()来做的,现在用private final Condition condition = lock.newCondition();创建出来的condition调用await()和signalAll()来做,作用都是一样的,只不过之前的synchronized、wait、notifyAll铁三角换成了Lock、await、signalAll铁三角

    4).精准通知顺序操作(为什么要用Lock取代synchronized的原因)

    题目:

    多线程之间严格按照顺序调用,实现A线程—》B线程—》C线程,当三个线程启动之后,要求如下:

    AA打印5次,BB打印10次,CC打印15次
    接着
    AA打印5次,BB打印10次,CC打印15次
    ……一共来10轮

    代码:

    // 资源类
    class Resource {
        private int number = 1;
        private final Lock lock = new ReentrantLock();
        // condition1、condition2、condition3分别对应线程AA、BB、CC
        private final Condition condition1 = lock.newCondition();
        private final Condition condition2 = lock.newCondition();
        private final Condition condition3 = lock.newCondition();
    
        @SneakyThrows // 不用抛出异常
        public void print(int size) {
            lock.lock();
            try {
                // 判断
                if (size == 5) {
                    while (number != 1) {
                        condition1.await();
                    }
                }
                if (size == 10) {
                    while (number != 2) {
                        condition2.await();
                    }
                }
                if (size == 15) {
                    while (number != 3) {
                        condition3.await();
                    }
                }
                // 干活
                for (int i = 1; i <= size; i++) {
                    System.out.println(Thread.currentThread().getName()+":"+i);
                }
                // 精准通知
                if (size == 5) {
                    number = 2;
                    condition2.signal();
                }
                if (size == 10) {
                    number = 3;
                    condition3.signal();
                }
                if (size == 15) {
                    number = 1;
                    condition1.signal();
                }
            } finally {
                lock.unlock();
            }
        }
    }
    
    public class Test {
        public static void main(String[] args) {
            // 创建资源类对象
            Resource resource = new Resource();
            // 创建多个线程
            new Thread(()->{for (int i = 0; i < 10;i++) resource.print(5); },"AA").start();
            new Thread(()->{for (int i = 0; i < 10;i++) resource.print(10); },"BB").start();
            new Thread(()->{for (int i = 0; i < 10;i++) resource.print(15); },"CC").start();
        }
    }
    

    解释:

    完美实现效果,可以创建多个Condition类进行操作,实现精准通知,如果使用synchronized铁三角就无法实现相同的效果

    5).lambda表达式写法

    一句话:

    复制小括号,写死右箭头,落地大括号

    解释:

    首先只有函数式接口中的方法才能使用Lambda表达式,而函数式接口就是方法中有且仅有一个方法需要被实现,但是里面还可以有default修饰的方法或者static修饰的静态方法是不影响的,毕竟它们不需要被实现,当然如果接口符合函数式接口的限制条件,并且该接口想成为函数式接口,最好在接口名称上面加上@FunctionalInterface注解,一方面可以表明本接口是函数式接口,另外一方面还可以验证当前接口是否符合函数式接口的约束条件,例如Runnable接口就是函数式接口;复制小括号就是复制那个被实现的方法中的(),例如Runnable接口中的public abstract void run();中的(),写死右箭头就是在小括号后面写上->,落地大括号就是最后写一个{};最后说明lambda表达式的作用就是简写匿名内部类

    例子:

    // 使用匿名内部类实现接口
    new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("我是一个线程");
        }
    }).start();
    // 使用Lambda表达式改写匿名内部类
    new Thread(() -> {System.out.println("我是一个线程"); }).start();}
    

    代码解释:

    在这里插入图片描述
    当然只有一条语句的时候,大括号可以省略,另外还有一些其他的规则,例如小括号中有参数的时候可以省略参数类型,如果只有一个参数不仅可以省略参数类型,还可以省略包裹参数的小括号,还有如果只有一条代码的话还可以省略大括号,如果这行代码是return返回值代码不仅可以省略大括号,还可以省略return关键字,直接在->后面写上返回值就可以了,更多细节请看:Lambda表达式详解

    第三集:8锁的现象

    在这里插入图片描述
    解释:

    其实做对这些问题需要了解这么一些事情,首先对象锁和类锁是不一样的,如果一个方法上加了synchronized,那使用的是对象锁,如果一个方法上加了static和synchronized,那使用的是类锁,然后对象锁锁的是一个对象,如果同一个类中的多个方法上加的仅仅有synchronized,并且同一时刻多个线程使用某对象去运行同步方法,但是只会有一个线程运行同步方法,其他线程都要等待;如果同一个类中的多个方法上加的有static和synchronized,那获得的锁就是类锁,类锁也叫做静态同步锁,当然类锁只关注同一个类中同时加的有static和synchronized的那些方法,另外我还想说对象锁和类锁是不同的,他们互不影响,他们是两个锁,没有从属关系,一个线程获得类锁的同时,另外一个线程可以获得对象锁,如果上述所说的类中的方法上没有加synchronized,也没有加static,那就不需要获得对象锁或者类锁,多少个线程同时去调用这些方法都是没有错的;另外还需要知道sleep()方法代表执行该方法的线程是带着锁睡的,醒来的时候线程依然带着锁

    第四集:不安全的ArrayList

    1、ArrayList不安全吗?

    问题:

    请举例说明ArrayList不是线程安全的?

    代码:

    public class Test {
        public static void main(String[] args) {
            List<String> list = new ArrayList<>();
            for (int i = 1; i < 100; i++) {
                new Thread(() -> {
                    // 写入
                    list.add(UUID.randomUUID().toString().substring(0, 8));
                    // 读出
                    System.out.println(list);
                }, String.valueOf(i)).start();
            }
        }
    }
    

    说明:

    会出现java.util.ConcurrentModificationException,该异常俗称并发修改异常,即多个线程需要写入,同时还有多个线程还要读出,所以会出现该异常

    2、ArrayList不安全怎么办?

    方案1:使用Vector类(不采用)

    举例:

    // 除了这一行之外,其他代码的和上面“1、ArrayList不安全吗?”中的测试代码相同
    List<String> list = new Vector<>();
    

    说明:

    加了synchronized,读取效率太低

    方案2:使用Collections.synchronizedList(new ArrayList<>())(适合低并发小数据量的时候使用)

    举例:

    // 除了这一行之外,其他代码的和上面“1、ArrayList不安全吗?”中的测试代码相同
    List<String> list = Collections.synchronizedList(new ArrayList<>());
    

    说明:

    可以把线程不安全的ArrayList对象变成线程安全的对象,其实就是对ArrayList中的每个方法上加synchronized

    在这里插入图片描述
    现在来解释一下里面的原理,我们点进synchronizedList()方法,代码如下:

    public static <T> List<T> synchronizedList(List<T> list) {
        return (list instanceof RandomAccess ?
                new SynchronizedRandomAccessList<>(list) :
                new SynchronizedList<>(list));
    }
    

    由于listnew ArrayList<>(),所以instanceof判断结果是false,所以执行new SynchronizedList<>(list),我们点击SynchronizedList进去看一下,代码如下:

    static class SynchronizedCollection<E> implements Collection<E>, Serializable {
    	private static final long serialVersionUID = 3053995032091335093L;
    
    	final Collection<E> c;  // Backing Collection
    	final Object mutex;     // Object on which to synchronize
    	………………
    }
    
    static class SynchronizedList<E>
    extends SynchronizedCollection<E>
    implements List<E> {
    	private static final long serialVersionUID = -7754090372962971524L;
    
    	// list就是new ArrayList<>()
    	final List<E> list;
    
    	SynchronizedList(List<E> list) {
    		super(list);
    		this.list = list;
    	}
    	// 使用synchronized同步代码块来保证原子性操作
    	public void add(int index, E element) {
    		synchronized (mutex) {list.add(index, element);}
    	}
    }
    

    可以看到代码中用到了synchronized同步代码块,因此会保证操作的原子性

    方案3:使用CopyOnWriteArrayList代替ArrayList(适合多线程高并发大数据量的时候使用)

    举例:

    // 除了这一行之外,其他代码的和上面“1、ArrayList不安全吗?”中的测试代码相同
    List<String> list = new CopyOnWriteArrayList<>();
    

    说明:

    这是写时复制思想, 首先看add()方法中有可重入锁,这个目的是防止多个线程争抢写的权力,然后下面红框中的内容是将原件复制出来一份,然后在复印件上写,之后通过setArray()方法让原件地址指向复印件,这样可以让所有人读原件,而我只修改复印件,所以读和写不会出现冲突,因此通过加锁和写时复制思想可以很好保证了多线程情况下所有线程都可以读,但是只有一个线程在写,因此不会出现并发修改异常,如下图:

    在这里插入图片描述
    以上是说自己的解释,以下是老师的解释:在这里插入图片描述

    第五集:不安全的HashSet

    1、HashSet不安全吗?

    问题:

    请举例说明HashSet不是线程安全的?

    代码:

    public class Test {
        public static void main(String[] args) {
            Set<String> set = new HashSet<>();
            for (int i = 1; i < 100; i++) {
                new Thread(() -> {
                    // 写入
                    set.add(UUID.randomUUID().toString().substring(0, 8));
                    // 读出
                    System.out.println(set);
                }, String.valueOf(i)).start();
            }
        }
    }
    

    说明:

    会出现java.util.ConcurrentModificationException,该异常俗称并发修改异常,即多个线程需要写入,同时还有多个线程还要读出,所以会出现该异常
    另外还需要说明的一点是HashSet的底层是HashMap,可以看源码:

    在这里插入图片描述
    然而HashMap的底层是Hash表,Hash表是数组加单向链表的组成结构

    2、HashSet不安全怎么办?

    方案1:使用Collections.synchronizedSet(new HashSet<>())(适合低并发小数据量的时候使用)

    举例:

    // 除了这一行之外,其他代码的和上面“1、HashSet不安全吗?”中的测试代码相同
    Set<String> set = Collections.synchronizedSet(new HashSet<>());
    

    说明:

    可以把线程不安全的HashSet对象变成线程安全的对象,其实就是对HashSet中的每个方法上加synchronized

    在这里插入图片描述

    方案2:使用CopyOnWriteArraySet代替HashSet(适合多线程高并发大数据量的时候使用)

    举例:

    // 除了这一行之外,其他代码的和上面“1、HashSet不安全吗?”中的测试代码相同
    Set<String> set = new CopyOnWriteArraySet<>();
    

    说明:

    首先CopyOnWriteArraySetadd()方法:

    在这里插入图片描述
    然后点击addIfAbsent()就到了CopyOnWriteArrayList类中:

    在这里插入图片描述
    之后我们点击addIfAbsent()方法后发现CopyOnWriteArrayList类中的该方法内部代码:

    在这里插入图片描述

    第六集:不安全的HashMap

    1、HashMap不安全吗?

    问题:

    请举例说明HashMap不是线程安全的?

    代码:

    public class Test {
        public static void main(String[] args) {
            Map<String, String> map = new HashMap<>();
            for (int i = 1; i < 100; i++) {
                new Thread(() -> {
                    // 写入
                    map.put(UUID.randomUUID().toString().substring(0, 8), UUID.randomUUID().toString().substring(0, 8));
                    // 读出
                    System.out.println(map);
                }, String.valueOf(i)).start();
            }
        }
    }
    

    说明:

    会出现java.util.ConcurrentModificationException,该异常俗称并发修改异常,即多个线程需要写入,同时还有多个线程还要读出,所以会出现该异常

    2、HashMap不安全怎么办?

    方案1:使用HashTable(不采用)

    举例:

    // 除了这一行之外,其他代码的和上面“1、HashMap不安全吗?”中的测试代码相同
    Map<String, String> map = new Hashtable<>();
    

    说明:

    加了synchronized,读取效率太低,一次锁整张Hash表,相当于这个表是表锁,只要当前线程在读或者写,其他线程都不能进行操作,虽然解决了并发问题,但是相当于将并行操作变成了串行操作,另外在进行复合操作的时候也不是线程安全的,比如添加操作是判断是否存在和添加的联合操作,我们不期待这个过程在别人打扰,但是两个子操作中间是有可能被别人打扰的,可能我判断的时候没有这个值,但是在我执行添加操作之前被别人添加上了,然后我再去添加的时候发现有人已经添加了,这不就是添了个寂寞吗,说明添加操作就不是线程安全的

    方案2:使用Collections.synchronizedMap(new HashMap<>())(适合低并发小数据量的时候使用)

    举例:

    // 除了这一行之外,其他代码的和上面“1、HashMap不安全吗?”中的测试代码相同
    Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
    

    说明:

    可以把线程不安全的HashMap对象变成线程安全的对象,其实就是对HashMap中的每个方法上加synchronized
    在这里插入图片描述

    方案3:使用ConcurrentHashMap代替HashMap(适合多线程高并发大数据量的时候使用)

    举例:

    // 除了这一行之外,其他代码的和上面“1、HashMap不安全吗?”中的测试代码相同
    Map<String, String> map = new ConcurrentHashMap<>();
    

    说明:

    在JDK1.7中,ConcurrentHashMap里面使用的是锁分段机制,默认ConcurrentLevel分段级别是16,也就是说默认有16个段Segment段,每个段上有一个锁,每个段后面都是一个哈希表,这样每次就有16个锁了,我们每次操作的是不同的锁,这样比HashTable中的1个锁要好多了,至少不是串行操作了

    在jdk1.8中,ConcurrentHashMap取消了分段锁机制(锁分段机制也是一样的意思,都可以用,没有区别),底层使用CAS算法,CAS算法根本没有使用锁,它使用的是while循环判断,如果修改成功那就结束while循环,否则只要依然有CPU时间片,那就继续执行修改操作,直到执行成功为止,由于JVM为Unsafe类中的CAS方法写成汇编指令,虽然CAS中涉及到获取比较交换三个操作,但是这三个操作是原子操作,中途不会被打断,所以CAS操作是原子性的,并且没有锁,效率更高,由于一直在while循环中,所以会减少线程上下文切换的消耗,但是循环也会带来一些时间开销

    在这里插入图片描述

    第七集:Callable

    代码:

    class MyThread implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("hello MyThread!");
            TimeUnit.SECONDS.sleep(4);
            return 1024;
        }
    }
    
    public class Test {
        public static void main(String[] args) throws Exception {
            FutureTask futureTask = new FutureTask(new MyThread());
            new Thread(futureTask, "A").start();
            System.out.println(futureTask.get());
            System.out.println("hello Test!");
        }
    }
    

    解释:

    首先输出hello MyThread!,经过4秒中之后输出1024,然后输出hello Test!,说明get()方法会阻塞调用该方法的线程,比如本次阻塞的就是主线程,因此我们最好将get()方法往后放,等到主线程中的其他操作完成的时候在通过get()方法获取返回值,毕竟尽量少等会是最好的选择,其实我们都知道Thread类的构建方法中的两个参数其中的第一个参数是Runnable接口,如果我们想使用Runnable接口,那我们需要Callable接口和Runnable接口挂上钩,具体关联如下:

    在这里插入图片描述
    Callable接口和Runnable的不同:

    • 名称不同
    • 重写方法不同,Callable中是call()方法,而Runnable中是run()方法
    • 是否带泛型,其中Callable接口带有泛型,而Runnable接口不带有泛型
    • 是否有返回值,其中Callable接口有返回值,返回值类型和泛型类型一致
    • 构建方式不同,其中Callable接口用的是FutureTask类作为媒介来联系上Runnable接口

    第八集:CountDownLatch

    代码:

    public class Test {
        public static void main(String[] args) throws Exception {
            CountDownLatch countDownLatch = new CountDownLatch(6);
            for (int i = 1; i <= 6; i++) {
                new Thread(()->{
                    System.out.println(Thread.currentThread().getName()+"号学生离开教室");
                    countDownLatch.countDown();
                },String.valueOf(i)).start();
            }
            countDownLatch.await();
            System.out.println("所有学生走完,关门");
        }
    }
    

    结果:

    1号学生离开教室
    6号学生离开教室
    5号学生离开教室
    4号学生离开教室
    3号学生离开教室
    2号学生离开教室
    所有学生走完,关门
    

    解释:

    CountDownLatch countDownLatch = new CountDownLatch(6);代表只有执行6次countDownLatch.countDown();之后总量变成0,执行一次countDownLatch.countDown();就少一个1,然后countDownLatch.await();表示在等待着6变成0的时候,只要变成了0,那么System.out.println("所有学生走完,关门");就会执行

    第九集:CyclicBarrier

    代码:

    public class Test {
        public static void main(String[] args) throws Exception {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> System.out.println("7颗龙珠聚齐,开始召唤神龙!"));
            for (int i = 1; i <= 7; i++) {
                final int finalI = i;
                new Thread(() -> {
                    System.out.println(Thread.currentThread().getName() + "号收集到第" + finalI + "颗龙珠");
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }, String.valueOf(i)).start();
            }
        }
    }
    

    结果:

    2号收集到第2颗龙珠
    3号收集到第3颗龙珠
    4号收集到第4颗龙珠
    1号收集到第1颗龙珠
    5号收集到第5颗龙珠
    6号收集到第6颗龙珠
    7号收集到第7颗龙珠
    7颗龙珠聚齐,开始召唤神龙!
    

    解释:

    之前说的CountDownLatch是每次减去1,而本次是执行一次await()方法就加一个1,加到7,那就可以执行CyclicBarrier中的第二个参数中的线程了

    第十集:Semaphore

    代码:

    public class Test {
        public static void main(String[] args) throws Exception {
            Semaphore semaphore = new Semaphore(3);
            for (int i = 1; i <= 6; i++) {
                new Thread(()->{
                    try {
                        // 获得信号量
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName()+"号车主抢到了车位");
                        // 暂停一会线程,表示占据车位的过程
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println(Thread.currentThread().getName()+"号车主离开了车位");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        // 释放信号量
                        semaphore.release();
                    }
                },String.valueOf(i)).start();
            }
        }
    }
    

    结果:

    3号车主抢到了车位
    1号车主抢到了车位
    2号车主抢到了车位
    2号车主离开了车位
    1号车主离开了车位
    3号车主离开了车位
    4号车主抢到了车位
    5号车主抢到了车位
    6号车主抢到了车位
    4号车主离开了车位
    5号车主离开了车位
    6号车主离开了车位
    

    解释:

    一共准备了3个车位,也就是信号量构造方法中的3,然后有6个车主去抢车位,当3个车主抢到车位之后,另外几位车主只能等待,当有车主移出车位之后其他车主才可以抢车位,其中我们所说的车主就是一个一个的线程

    第十一集:ReentrantReadWriteLock

    代码:

    class MyCache {
        private volatile Map<String,Object> map = new HashMap<>();
        private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    
        // 写入数据
        @SneakyThrows
        public void put(String key, Object value) {
            readWriteLock.writeLock().lock();
            try {
                System.out.println(Thread.currentThread().getName()+"号线程写入数据开始,key="+key);
                map.put(key, value);
                // 模拟程序执行过程,但是不会出现多次连续写
                TimeUnit.MICROSECONDS.sleep(100);
                System.out.println(Thread.currentThread().getName()+"号线程写入数据结束,key="+key);
            } finally {
                readWriteLock.writeLock().unlock();
            }
        }
    
        // 读出数据
        @SneakyThrows
        public void get(String key) {
            readWriteLock.readLock().lock();
            try {
                System.out.println(Thread.currentThread().getName()+"号线程读出数据开始,key="+key);
                Object o = map.get(key);
                // 模拟程序执行过程,可以出现多次连续读
                TimeUnit.MICROSECONDS.sleep(100);
                System.out.println(Thread.currentThread().getName()+"号线程读出数据结束,key="+key);
            } finally {
                readWriteLock.readLock().unlock();
            }
        }
    }
    
    public class Test {
        public static void main(String[] args) throws Exception {
            MyCache myCache = new MyCache();
            for (int i = 1; i <= 5; i++) {
                int finalI = i;
                new Thread(()->{myCache.put(String.valueOf(finalI), String.valueOf(finalI));},String.valueOf(i)).start();
            }
            for (int i = 6; i <= 10; i++) {
                int finalI = i;
                new Thread(()->{myCache.get(String.valueOf(finalI-5));},String.valueOf(finalI)).start();
            }
        }
    }
    

    结果:

    1号线程写入数据开始,key=1
    1号线程写入数据结束,key=1
    3号线程写入数据开始,key=3
    3号线程写入数据结束,key=3
    2号线程写入数据开始,key=2
    2号线程写入数据结束,key=2
    4号线程写入数据开始,key=4
    4号线程写入数据结束,key=4
    5号线程写入数据开始,key=5
    5号线程写入数据结束,key=5
    6号线程读出数据开始,key=1
    7号线程读出数据开始,key=2
    8号线程读出数据开始,key=3
    10号线程读出数据开始,key=5
    9号线程读出数据开始,key=4
    7号线程读出数据结束,key=2
    9号线程读出数据结束,key=4
    8号线程读出数据结束,key=3
    10号线程读出数据结束,key=5
    6号线程读出数据结束,key=1
    

    解释:

    一个线程加上写锁,那其他的线程无论是获取写锁还是获取读锁都只能等待该线程释放写锁,而一个线程加上读锁,那其他的线程获取写锁需要等待,但是多个线程同时获取读锁;
    简单来说就是:写的时候其他线程不能写不能读,而读的时候其他线程可以读但不能写

    第十二集:BlockingQueue

    代码:

    public class Test {
        public static void main(String[] args) throws Exception {
            BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
            /*
            // 1、抛出异常(插入add(e)、移除remove()、检查element())
            // 往阻塞队列中放入元素
            System.out.println(blockingQueue.add("a")); // true
            System.out.println(blockingQueue.add("b")); // true
            System.out.println(blockingQueue.add("c")); // true
            System.out.println(blockingQueue.add("d")); // Exception in thread "main" java.lang.IllegalStateException: Queue full
            // 查看阻塞队列尾部中的元素
            System.out.println(blockingQueue.element()); // a
            // 从阻塞队列中取出元素
            System.out.println(blockingQueue.remove()); // a
            System.out.println(blockingQueue.remove()); // b
            System.out.println(blockingQueue.remove()); // c
            System.out.println(blockingQueue.remove()); // Exception in thread "main" java.util.NoSuchElementException
            */
    
            /*
            // 2、返回布尔值(插入offer(e)、移除poll()、检查peek())
            // 往阻塞队列中放入元素
            System.out.println(blockingQueue.offer("a")); // true
            System.out.println(blockingQueue.offer("b")); // true
            System.out.println(blockingQueue.offer("c")); // true
            System.out.println(blockingQueue.offer("d")); // false
            // 查看阻塞队列尾部中的元素
            System.out.println(blockingQueue.peek()); // a
            // 从阻塞队列中取出元素
            System.out.println(blockingQueue.poll()); // a
            System.out.println(blockingQueue.poll()); // b
            System.out.println(blockingQueue.poll()); // c
            System.out.println(blockingQueue.poll()); // null
            */
    
            /*
            // 3、阻塞(插入put(e)、移除take())
            // 往阻塞队列中放入元素
            blockingQueue.put("a"); // 不阻塞
            blockingQueue.put("b"); // 不阻塞
            blockingQueue.put("c"); // 不阻塞
            blockingQueue.put("d"); // 阻塞
            // 从阻塞队列中取出元素
            System.out.println(blockingQueue.take()); // a
            System.out.println(blockingQueue.take()); // b
            System.out.println(blockingQueue.take()); // c
            System.out.println(blockingQueue.take()); // 阻塞
            */
    
            /*
            // 4、超时(插入offer(e,time,unit)、移除poll(time,unit))
            // 往阻塞队列中放入元素
            System.out.println(blockingQueue.offer("a",1,TimeUnit.SECONDS)); // true
            System.out.println(blockingQueue.offer("b",1,TimeUnit.SECONDS)); // true
            System.out.println(blockingQueue.offer("c",1,TimeUnit.SECONDS)); // true
            System.out.println(blockingQueue.offer("d",1,TimeUnit.SECONDS)); // 等待1秒钟之后返回false
            // 从阻塞队列中取出元素
            System.out.println(blockingQueue.poll(1,TimeUnit.SECONDS)); // a
            System.out.println(blockingQueue.poll(1,TimeUnit.SECONDS)); // b
            System.out.println(blockingQueue.poll(1,TimeUnit.SECONDS)); // c
            System.out.println(blockingQueue.poll(1,TimeUnit.SECONDS)); // 等待1秒钟之后返回null
            */
        }
    }
    

    解释:

    以上方法如下:
    在这里插入图片描述

    阻塞队列有7个实现类,比较重要的实现类有:ArrayBlockingQueue:由数组结构组成的有界阻塞队列LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列PriorityBlockingQueue:支持优先级排序的无界阻塞队列,其他几个实现类如下:

    在这里插入图片描述

    第十三集:线程池的三大方法

    1、newFixedThreadPool(n)代码:

    public class Test {
        public static void main(String[] args) throws Exception {
            ExecutorService threadPool = Executors.newFixedThreadPool(3);
            try {
                for (int i = 1; i <= 5; i++) {
                    int finalI = i;
                    threadPool.submit(() -> {
                        System.out.println(Thread.currentThread().getName() + "办理" + finalI + "号业务");
                    });
                }
            } finally {
                threadPool.shutdown();
            }
        }
    }
    

    newFixedThreadPool(n)结果:

    pool-1-thread-1办理1号业务
    pool-1-thread-2办理2号业务
    pool-1-thread-1办理4号业务
    pool-1-thread-2办理5号业务
    pool-1-thread-3办理3号业务
    

    newFixedThreadPool(n)解释:

    相当于三个员工办理5个业务,无论刮风下雨,3个员工始终在岗,底层代码:

    在这里插入图片描述
    另外Executor可以用来创建线程池,而ExecutorService是该接口的子接口,相比来说,子接口功能更加强大,因此我们使用子接口ExecutorService,另外submit()方法是可以有返回值的,而execute()方法没有返回值,不过他们都可以提交任务

    2、newSingleThreadExecutor()代码:

    使用

    ExecutorService threadPool = Executors.newSingleThreadExecutor();
    

    代替:

    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    

    其他的代码和newFixedThreadPool(n)中的代码一致

    newSingleThreadExecutor()结果:

    pool-1-thread-1办理1号业务
    pool-1-thread-1办理2号业务
    pool-1-thread-1办理3号业务
    pool-1-thread-1办理4号业务
    pool-1-thread-1办理5号业务
    

    newSingleThreadExecutor()解释:

    相当于银行始终只有一个员工,底层代码:

    在这里插入图片描述

    3、newCachedThreadPool()代码:

    使用

    ExecutorService threadPool = Executors.newCachedThreadPool();
    

    代替:

    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    

    其他的代码和newFixedThreadPool(n)中的代码一致

    newCachedThreadPool()结果:

    pool-1-thread-2办理2号业务
    pool-1-thread-3办理3号业务
    pool-1-thread-4办理4号业务
    pool-1-thread-1办理1号业务
    pool-1-thread-5办理5号业务
    

    newCachedThreadPool()解释:

    银行的有Integer.MAX_VALUE个员工(Integer.MAX_VALUE即2147483647),但是没有默认在岗员工,只要有顾客需要,那这些员工都可以到岗,如果不需要这么多,他们自然不会到岗,底层代码:

    在这里插入图片描述

    第十四集:ThreadPoolExecutor的底层原理

    对于以上线程池的三大方法,在工作中我们一个也不用,他们都是具有缺陷的,在上面我也给大家展示了它的底层,他们都是new ThreadPoolExecutor(),其中ThreadPoolExecutor实现了AbstractExecutorService,而AbstractExecutorService是ExecutorService接口的子接口,因此我们可以来新建ThreadPoolExecutor对象,我们暂且不说我们此时看到的ThreadPoolExecutor类的构造方法中的5个参数,因为该类还有一个构造方法里面有7个参数,当使用线程池的时候,我们使用的是这7个参数的,我们首先看一下它长什么样子,然后在说明里面参数的含义:

    在这里插入图片描述

    现在来解释这7个参数的含义:
    在这里插入图片描述
    现在来更细致的解释这几个参数:

    corePoolSize(核心线程数):假设该值是3的话,说明线程池中至少有3个线程始终保持活跃,相当于银行窗口的默认值守员工,无论银行有没有顾客,窗口最少有三个员工在值守

    maximumPoolSize(最大线程数):假设该值是10的话,说明线程池中最多有10个线程同时保持活跃,相当于银行最多有10个窗口

    keepAliveTime、unit(存活时间):假设keepAliveTime是2,unit是TimeUnit.HOURS,说明在2个小时之内还用不到大于corePoolSize数量的这些线程的话,那就会销毁这些多余的线程,相当于银行在2个小时之内都只用不到默认的3个窗口办公就行,那只留下3个员工值守就行,其他的员工可以回家休息了

    workQueue(阻塞队列):如果核心线程池中的线程都被用了,那多余的任务f就需要先到阻塞队列中等待,相当于银行的窗口都被顾客用着,那在来的顾客只能去候客厅等着了

    threadFactory(创建线程工厂):创建线程的工厂,毕竟核心线程数不够用了,那还需要创建线程,这就使用到了创建线程的工厂,相当于大堂经理一看人太多了需要增加窗口员工

    handler(拒绝策略):如果任务太多时候的处理办法,一共有4种处理办法,当核心线程数中的线程都被用了,那其他任务先到阻塞队列中,然后创建线程的工厂创建新的线程,一直到达了最大线程数,但是人还是很多,最后阻塞队列也满了,这个时候就需要使用到拒绝策略来处理多余的那些任务

    现在来使用通俗易懂的例子来解释这7个参数是怎么回事,假设银行每天默认有3个窗口在开放,无论有没有顾客都有这三个窗口都必须开放,这3个窗口就相当于corePoolSize;假设顾客超过了3个,那就需要先去候客厅排号等待,候客厅相当于workQueue;当候客厅已经满了,大堂经理一看这可不行,得赶快加窗口,然后他打电话让几个员工来加班,打开窗口就相当于threadFactory;然后顾客一点点的少了下来,只用3个窗口就够用了,又等了两个小时,还不需要那么多窗口,然后大堂经理就告诉加班的员工可以回去了,这2个小时就相当于keepAliveTime、unit;然后等了一会来了很多顾客,大堂经理一看这得把全部窗口打开,让员工都加班,这10个窗口相当于maximumPoolSize;这个时候候客厅人数得到了缓解,但随着时间的推移,人越来越多,窗口全部打开并且占满着,候客厅也完全满了,外边还有顾客等着进来,大堂经理一看这可不行,不能让顾客等那么久,然后他就去门口告诉那些还要进来的顾客们人已经满了,对他们进行拒绝,这就是handler;

    接下来我将说一下maximumPoolSize怎么设置,老师说了需要考虑当前的程序是CPU密集型还是IO密集型,如果是CPU密集型,就需要改成当前电脑或者服务器核数+1或者+2,总结如下:
    在这里插入图片描述

    如果是IO密集型处理起来就比较麻烦,因为IO密集型任务线程并不是一直执行任务,所以应该配置尽可能多的线程,比如CPU核数*2,以上IO密集型配置方法是一个笼统的说法,我们还可以采用某大厂用过的方式,总结如下:
    在这里插入图片描述

    然后我说一下如何使用代码来获取当前电脑或者服务器的CPU核数:

    int coreNum = Runtime.getRuntime().availableProcessors();
    

    等会我编写代码演示这7个参数使用的时候会具体用到这个方法,不用着急

    然后来说一下handler(拒绝策略),直接看图:

    在这里插入图片描述
    接下来举出一个例子说明具体如何使用:

    public class Test {
        public static void main(String[] args) throws Exception {
            ExecutorService threadPool = new ThreadPoolExecutor(
            		3,
                    Runtime.getRuntime().availableProcessors() + 1,
                    2,
                    TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(100),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
            try {
                for (int i = 1; i <= 100; i++) {
                    int finalI = i;
                    threadPool.submit(() -> {
                        System.out.println(Thread.currentThread().getName() + "办理" + finalI + "号业务");
                    });
                }
            } finally {
                threadPool.shutdown();
            }
        }
    }
    

    其中 new LinkedBlockingDeque<>(100)是Executors中默认使用的,不过我设置了阻塞队列的大小而已,在第十三集:线程池的三大方法中写的我也截图了,现在我再来截个图:

    在这里插入图片描述
    Executors.defaultThreadFactory()、new ThreadPoolExecutor.AbortPolicy()来自于ThreadPoolExecutor.java中的构造方法中:

    在这里插入图片描述

    如果你想知道该线程池可以同时处理多少任务,你可以使用(maximumPoolSize+workQueue的大小)之和计算出来,如果同时处理的并发数大于该值,当使用new ThreadPoolExecutor.AbortPolicy()拒绝策略的时候就会出现java.util.concurrent.RejectedExecutionException,所以需要合理设计阻塞队列的大小,最大线程数量是可以计算出来的,那就相等于固定数值,主要根据并发数算出来阻塞队列的大小

    第十五集:四大函数式接口

    总览:
    在这里插入图片描述

    代码:

    public class Test {
        public static void main(String[] args) throws Exception {
            // Consumer:有输入没有返回值,泛型就是输入值的类型
            Consumer<String> consumer = t -> System.out.println(t);
            consumer.accept("hello world!");
            // Supplier:没有输入没有返回值,泛型就是返回值的类型
            Supplier<String> supplier = ()->{return "hello world!";};
            System.out.println(supplier.get());
            // Function:有输入有返回值,第一个泛型就是输入值的类型,第二个参数是返回值的类型
            Function<String,Integer> function = t->{return t.length();};
            System.out.println(function.apply("hello world!"));
            // Predicate:有输入有返回值,泛型就是输入值的类型,返回值是布尔类型
            Predicate<String> predicate = t->{return t.length()>0;};
            System.out.println(predicate.test("hello world!"));
        }
    }
    

    结果:

    hello world!
    hello world!
    12
    true
    

    总结:

    这四个函数式接口在流式计算中很常用

    第十六集:流式编程

    代码:

    public class Test {
        public static void main(String[] args) throws Exception {
            User u1 = new User(11, "a", 23);
            User u2 = new User(12, "b", 24);
            User u3 = new User(13, "c", 22);
            User u4 = new User(14, "d", 28);
            User u5 = new User(16, "e", 26);
            List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
            list.stream().filter(u -> u.getId() % 2 == 0 && u.getAge() > 24)
                    .map(u -> u.getUserName().toUpperCase())
                    .sorted((name1, name2) -> name2.compareTo(name1))
                    .limit(1)
                    .forEach(System.out::println);
        }
    
        @Data
        @AllArgsConstructor
        public static class User {
            private Integer id;
            private String userName;
            private Integer age;
        }
    }
    

    结果:

    E
    

    解释:

    • 流式计算为数组和集合服务,不为Map服务,对于集合来说,所有集合的父类Collection中有一个stream()方法,所以所有的集合对象都可以用stream方法,而数组可以通过Arrays.stream(数组名称)来把一个数组变成流,然后就可以进行流式计算;
    • stream方法的返回值是一个流,流后面的泛型可以是集合中的数据类型,例如List<User>中的User,根据default Stream<E> stream(){XXX}可知;也可以是数组中的类型,例如Integer[]中的Integer,根据public static <T> Stream<T> stream(T[] array){XXX}可知;
    • filter中用的是断定型接口Predicate,可以用来过滤,传入一个参数,但是返回一个布尔值,最终filter方法返回的是由Predicate断定型接口的输入值作为泛型的流,根据Stream<T> filter(Predicate<? super T> predicate)可知;
    • map中用的是函数型接口Function,可以用来映射,注意map()可不是集合,传入一个参数,可以返回一个任意类型的参数,最终map方法的返回值是一个由Function函数型接口的返回值作为泛型的流,根据<R> Stream<R> map(Function<? super T, ? extends R> mapper)可知;
    • sorted中用的是比较型接口Comparator,可以用来比较,需要传输两个相同类型的参数,通过比较得出返回值,例如o1.compareTo(o2)就是按照正序排列,而o2.compareTo(o1)就是按照倒序排列,返回值就是比较型接口输入值的类型作为泛型的流,根据Stream<T> sorted(Comparator<? super T> comparator)可知
    • limit中用的是一个long型的数字,返回的还是调用limit()方法的流类型
    • forEach中用的是消费型接口Consumer,可以用来输出,没有返回值,只需要传输一个参数就可以了,根据void forEach(Consumer<? super T> action)可知
    • collect(Collectors.toList())叫做收集,一般用在最后将流变成一个List集合
    • toArray()也叫做收集,一般用在最后将流变成一个数组,但是只能是Object类型的
    • 除此之外还有count()distinct()empty()generate(Supplier<T> s)mapToInt(ToIntFunction<? super T> mapper),但是有可能使用的不是我们平常使用的方法,例如:
      在这里插入图片描述

    第十七集:ForkJoin例子

    代码:

    class MyTask extends RecursiveTask<Integer> {
        private static final Integer ADJUST_VALUE = 10;
        private int begin;
        private int end;
        private int result;
    
        public MyTask(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }
    
        @Override
        protected Integer compute() {
            if ((end - begin) <= ADJUST_VALUE) {
                for (int i = begin; i <= end; i++) {
                    result = result + i;
                }
            } else {
                int middle = (begin + end) / 2;
                MyTask task01 = new MyTask(begin, middle);
                MyTask task02 = new MyTask(middle + 1, end);
                task01.fork();
                task02.fork();
                result = task01.join() + task02.join();
            }
            return result;
        }
    }
    
    /**
     *  * 分支合并例子
     *  * ForkJoinPool
     *  * ForkJoinTask
     *  * RecursiveTask
     *  
     */
    public class Test {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            MyTask myTask = new MyTask(0, 100);
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
            System.out.println(forkJoinTask.get());
            forkJoinPool.shutdown();
        }
    }
    

    结果:

    5050
    

    解释:

    Fork:把一个复杂任务进行分拆,大事化小
    Join:把分拆任务的结果进行合并
    在这里插入图片描述

    第十八集:CompletableFuture例子

    代码:

    public class Test {
        // 创建线程池,简单起见就随便创建一个,也不用那7个参数了
        public static final ExecutorService executor = Executors.newFixedThreadPool(3);
    
        public static void main(String[] args) throws Exception {
            // 注意:使用我们自己创建的线程池去做事情,所以1和3都不用,由于我们没有关闭线程池,所以程序不会停,不要怀疑
    
            // 1、没有返回值的异步回调(Void仅有的一个无参构造方法已经被私有化了,相当于没有返回值),不使用自己的线程池
            System.out.println("***1、没有返回值的异步回调(Void仅有的一个无参构造方法已经被私有化了),不使用自己的线程池***");
            CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture1中的代码执行");
            });
    
            // 2、没有返回值的异步回调,使用自己的线程池
            System.out.println("***2、没有返回值的异步回调,使用自己的线程池***");
            CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture2中的代码执行");
            }, executor);
    
            // 3、用于测试whenComplete()、exceptionally(),然后有返回值的异步回调,不使用自己的线程池
            System.out.println("***3、有返回值的异步回调,不使用自己的线程池***");
            CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture3中的代码执行");
                int i = 10 / 0;
                return 1024;
            });
            System.out.println(completableFuture3.whenComplete((result, exception) -> {
                System.out.println("whenComplete中的返回值 = " + result);
                System.out.println("whenComplete中的异常 = " + exception);
            }).exceptionally(exception -> {
                System.out.println("exceptionally中的异常 = " + exception.getMessage());
                // 假设出现异常就返回0
                return 0;
            }).get());
    
            // 4、用于测试whenComplete()、exceptionally(),然后有返回值的异步回调,使用自己的线程池
            System.out.println("***4、有返回值的异步回调,使用自己的线程池***");
            CompletableFuture<Integer> completableFuture4 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture4中的代码执行");
                int i = 10 / 0;
                return 1024;
            }, executor);
    
            // 如果completableFuture4中不出错,只会执行whenComplete()中的内容,而不会执行exceptionally()中的内容;
            // 如果completableFuture4中出错,不仅会执行,whenComplete()中的内容,而且会执行exceptionally()中的内容;
            // 无论是否出错,都会等着completableFuture4中的代码执行完成,才会执行下面的whenComplete()或者exceptionally()方法,
            // 如果completableFuture4中的代码没有执行完成,那会阻塞在这里,等待着代码执行完成
            System.out.println(completableFuture4.whenComplete((result, exception) -> {
                System.out.println("whenComplete中的返回值 = " + result);
                System.out.println("whenComplete中的异常 = " + exception);
            }).exceptionally(exception -> {
                System.out.println("exceptionally中的异常 = " + exception.getMessage());
                // 假设出现异常就返回0
                return 0;
            }).get());
    
            // 5、用于测试handle(),相比4的做法,本次直接在handle()中进行返回值处理以及异常处理了,
            //   不需要用到whenComplete和exceptionally两个方法了,直接通过get()方法获取的就是返回值
            System.out.println("***5、有返回值的异步回调,使用自己的线程池,相比4的做法,本次直接在handle()中进行返回值处理以及异常处理***");
            CompletableFuture<Integer> completableFuture5 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture5中的代码执行");
                int i = 10 / 0;
                return 1024;
            }, executor).handle((result, exception) -> {
                if (exception == null) {
                    return result;
                }
                return 0;
            });
            System.out.println(completableFuture5.get());
    
            // 6、用于测试thenRunAsync(),不能接收上一步的返回值,不能返回结果,只有成功才能执行thenRunAsync()
            System.out.println("***6、用于测试thenRunAsync(),不能接收上一步的返回值,不能返回结果***");
            CompletableFuture<Void> completableFuture6 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture6中的代码执行");
                return 1024;
            }, executor).thenRunAsync(() -> {
                System.out.println("thenRunAsync成功执行了");
            }, executor);
            System.out.println(completableFuture6.get());
    
            // 7、用于测试thenAcceptAsync(),可以接收上一步的返回值,不能返回结果,只有成功才能执行thenAcceptAsync()
            System.out.println("***7、用于测试thenAcceptAsync(),可以接收上一步的返回值,不能返回结果,只有成功才能执行thenAcceptAsync()***");
            CompletableFuture<Void> completableFuture7 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture7中的代码执行");
                return 1024;
            }, executor).thenAcceptAsync(result -> {
                System.out.println("thenAcceptAsync接收到返回值=" + result);
            }, executor);
            System.out.println(completableFuture7.get());
    
            // 8、用于测试thenAcceptAsync(),可以接收上一步的返回值,不能返回结果,只有成功才能执行thenAcceptAsync()
            System.out.println("***8、用于测试thenAcceptAsync(),可以接收上一步的返回值,不能返回结果,只有成功才能执行thenAcceptAsync()***");
            CompletableFuture<Integer> completableFuture8 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture8中的代码执行");
                return 1024;
            }, executor).thenApplyAsync(result -> {
                System.out.println("thenAcceptAsync接收到返回值=" + result);
                return result + 1;
            }, executor);
            System.out.println(completableFuture8.get());
    
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "》future1执行");
                return 1024;
            });
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "》future2执行");
                return "hello";
            });
    
            CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "》future3执行");
                return 2021;
            });
            // 9、测试两个组合任务,只有future1和future2完成之后才会执行该任务,thenCombineAsync方法可以接收future1和future2的返回值,另外该方法还可以返回一个值
            System.out.println("***9、测试两个组合任务,只有future1和future2完成之后才会执行该任务,thenCombineAsync方法可以接收future1和future2的返回值,另外该方法还可以返回一个值***");
            CompletableFuture<String> completableFuture9 = future1.thenCombineAsync(future2, (future1Result, future2Result) -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture9执行");
                return future1Result + "和" + future2Result;
            }, executor);
            System.out.println(completableFuture9.get());
    
            // 10、测试两个组合任务,只有future1和future2完成之后才会执行该任务,thenAcceptBothAsync方法可以接收future1和future2的返回值,但是不会返回值
            System.out.println("***10、测试两个组合任务,只有future1和future2完成之后才会执行该任务,thenAcceptBothAsync方法可以接收future1和future2的返回值,但是不会返回值***");
            CompletableFuture<Void> completableFuture10 = future1.thenAcceptBothAsync(future2, (future1Result, future2Result) -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture10执行");
            }, executor);
    
            // 11、测试两个组合任务,只有future1和future2完成之后才会执行该任务,runAfterBothAsync方法不接收future1和future2的返回值,并且不会返回值
            System.out.println("***11、测试两个组合任务,只有future1和future2完成之后才会执行该任务,runAfterBothAsync方法不接收future1和future2的返回值,并且不会返回值***");
            CompletableFuture<Void> completableFuture11 = future1.runAfterBothAsync(future2, () -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture11执行");
            }, executor);
    
            // 12、测试两个组合任务,只要future1和future3有一个完成之后就会执行该任务,applyToEitherAsync方法可以接收future1或者future3的返回值,并且会返回值,另外要求future1和future3返回值类型一致,当然你可以都用Object
            System.out.println("***12、测试两个组合任务,只要future1和future3有一个完成之后就会执行该任务,applyToEitherAsync方法可以接收future1或者future3的返回值,并且会返回值,另外要求future1和future3返回值类型一致***");
            CompletableFuture<Integer> completableFuture12 = future1.applyToEitherAsync(future3, futureResult -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture12执行");
                return futureResult;
            }, executor);
            System.out.println(completableFuture12.get());
    
            // 13、测试两个组合任务,只要future1和future3有一个完成之后就会执行该任务,acceptEitherAsync方法可以接收future1或者future3的返回值,但是不会返回值,另外要求future1和future3返回值类型一致,当然你可以都用Object
            System.out.println("***13、测试两个组合任务,只要future1和future3有一个完成之后就会执行该任务,acceptEitherAsync方法可以接收future1或者future3的返回值,但是不会返回值,另外要求future1和future3返回值类型一致***");
            CompletableFuture<Void> completableFuture13 = future1.acceptEitherAsync(future3, futureResult -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture13执行");
            }, executor);
    
            // 14、测试两个组合任务,只要future1和future2有一个完成之后就会执行该任务,runAfterEitherAsync方法不会接收future1或者future3的返回值,也不会返回值,不要求future1和future2返回值类型一致
            System.out.println("***14、测试两个组合任务,只要future1和future2有一个完成之后就会执行该任务,runAfterEitherAsync方法不会接收future1或者future3的返回值,也不会返回值,不要求future1和future2返回值类型一致***");
            CompletableFuture<Void> completableFuture14 = future1.runAfterEitherAsync(future2, () -> {
                System.out.println(Thread.currentThread().getName() + "》completableFuture14执行");
            }, executor);
    
            // 15、测试多个组合任务,只有全部任务完成才算完成
            System.out.println("***15、测试多个组合任务,只有全部任务完成才算完成***");
            CompletableFuture<Void> completableFuture15 = CompletableFuture.allOf(future1, future2, future3);
            completableFuture15.get();
            System.out.println("多组合任务完成,最终future1.get()=" + future1.get() + "、future2.get()=" + future2.get() + "、future3.get()=" + future3.get());
    
            // 16、测试多个组合任务,只要有一个完成就算完成
            System.out.println("***16、测试多个组合任务,只要有一个完成就算完成***");
            CompletableFuture<Object> completableFuture16 = CompletableFuture.anyOf(future1, future2, future3);
            completableFuture16.get();
            System.out.println("多组合任务完成,最终completableFuture16.get()=" + completableFuture16.get());
        }
    }
    

    结果:

    ***1、没有返回值的异步回调(Void仅有的一个无参构造方法已经被私有化了),不使用自己的线程池***
    ***2、没有返回值的异步回调,使用自己的线程池***
    ForkJoinPool.commonPool-worker-1》completableFuture1中的代码执行
    ***3、有返回值的异步回调,不使用自己的线程池***
    pool-1-thread-1》completableFuture2中的代码执行
    ForkJoinPool.commonPool-worker-1》completableFuture3中的代码执行
    whenComplete中的返回值 = null
    whenComplete中的异常 = java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    exceptionally中的异常 = java.lang.ArithmeticException: / by zero
    0
    ***4、有返回值的异步回调,使用自己的线程池***
    pool-1-thread-2》completableFuture4中的代码执行
    whenComplete中的返回值 = null
    whenComplete中的异常 = java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    exceptionally中的异常 = java.lang.ArithmeticException: / by zero
    0
    ***5、有返回值的异步回调,使用自己的线程池,相比4的做法,本次直接在handle()中进行返回值处理以及异常处理***
    pool-1-thread-3》completableFuture5中的代码执行
    0
    ***6、用于测试thenRunAsync(),不能接收上一步的返回值,不能返回结果***
    pool-1-thread-1》completableFuture6中的代码执行
    thenRunAsync成功执行了
    null
    ***7、用于测试thenAcceptAsync(),可以接收上一步的返回值,不能返回结果,只有成功才能执行thenAcceptAsync()***
    pool-1-thread-3》completableFuture7中的代码执行
    thenAcceptAsync接收到返回值=1024
    null
    ***8、用于测试thenAcceptAsync(),可以接收上一步的返回值,不能返回结果,只有成功才能执行thenAcceptAsync()***
    pool-1-thread-2》completableFuture8中的代码执行
    thenAcceptAsync接收到返回值=1024
    1025
    ForkJoinPool.commonPool-worker-1》future1执行
    ForkJoinPool.commonPool-worker-1》future2执行
    ***9、测试两个组合任务,只有future1和future2完成之后才会执行该任务,thenCombineAsync方法可以接收future1和future2的返回值,另外该方法还可以返回一个值***
    ForkJoinPool.commonPool-worker-1》future3执行
    pool-1-thread-1》completableFuture9执行
    1024和hello
    ***10、测试两个组合任务,只有future1和future2完成之后才会执行该任务,thenAcceptBothAsync方法可以接收future1和future2的返回值,但是不会返回值***
    ***11、测试两个组合任务,只有future1和future2完成之后才会执行该任务,thenCombineAsync方法可以接收future1和future2的返回值,但是不会返回值***
    pool-1-thread-2》completableFuture10执行
    ***12、测试两个组合任务,只要future1和future3有一个完成之后就会执行该任务,thenCombineAsync方法可以接收future1或者future3的返回值,但是会返回值,另外要求future1和future3返回值类型一致***
    pool-1-thread-3》completableFuture11执行
    pool-1-thread-1》completableFuture12执行
    1024
    ***13、测试两个组合任务,只要future1和future3有一个完成之后就会执行该任务,acceptEitherAsync方法可以接收future1或者future3的返回值,但是不会返回值,另外要求future1和future3返回值类型一致***
    ***14、测试两个组合任务,只要future1和future2有一个完成之后就会执行该任务,runAfterEitherAsync方法不会接收future1或者future3的返回值,也不会返回值,不要求future1和future2返回值类型一致***
    pool-1-thread-2》completableFuture13执行
    ***15、测试多个组合任务,只有全部任务完成才算完成***
    pool-1-thread-3》completableFuture14执行
    多组合任务完成,最终future1.get()=1024、future2.get()=hello、future3.get()=2021
    ***16、测试多个组合任务,只要有一个完成就算完成***
    多组合任务完成,最终completableFuture16.get()=1024
    

    解释:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    第十九集:Volatile关键字

    代码:

    public class Test {
        public static void main(String[] args) throws Exception {
            Demo demo = new Demo();
            new Thread(demo).start();
            while (true) {
                if (demo.isFlag()) {
                    System.out.println("______________");
                    break;
                }
            }
        }
    }
    
    class Demo implements Runnable {
        private boolean flag = false;
    
        @Override
        public void run() {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            flag = true;
            System.out.println("flag=" + isFlag());
        }
    
        public boolean isFlag() {
            return this.flag;
        }
    
        public void setFlag(boolean flag) {
            this.flag = flag;
        }
    }
    

    结果:

    出现flag=true之后线程就阻塞了

    解释:

    可以看到有主线程和分支线程一共两个线程,其实flag存在于主存中,然后分支线程运行的时候会把flag读入到当前线程的缓存中,之后主线程也会把flag读入当前线程的的缓存中,当然他们读到的都是false,然后分支线程把flag改成了true,由于while是一个很快的操作,所以此时主线程不知道flag已经发生改变了,那么通过demo.isFlag()获取的一直就是false,因此线程不会结束,例如:

    在这里插入图片描述
    如果想解决这个问题,我们需要把

    private boolean flag = false;
    

    变成

    private volatile boolean flag = false;
    

    这样对flag的所有操作都只在主存中进行了,其他的操作该在缓存中还在缓存中进行,因此上面的那幅图就可以这样画:
    在这里插入图片描述
    总结:

    1. 可以保证内存中的数据可见性,也就是在多个线程操作共享数据的时候,相当于这些操作都在主存中进行,和各个线程的缓存没有关系
    2. volatile不具备互斥性质,也就是说被对该数据的操作虽然在主存中进行,但是依然是多个线程同时可以对该数据进行读写,当然这也可以说成是原子性
    3. volatile可以禁止指令重排,指令重排就是对象创建的的时候有几个步骤,这几个步骤要按照顺序来,不能反了,由于要提高效率,所以默认是反的,而volatile的作用就是不让反

    第二十集:原子类

    代码:

    public class Test {
        public static void main(String[] args) throws Exception {
            Demo demo = new Demo();
            for (int i = 0; i < 10; i++) {
                new Thread(demo).start();
            }
        }
    }
    
    class Demo implements Runnable {
        private int serialNumber = 0;
    
        @Override
        public void run() {
            // 避免出不来原子性
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 打印出数字
            System.out.println(getSerialNumber());
        }
        // 执行加1操作
        public int getSerialNumber() {
            return this.serialNumber++;
        }
    }
    

    结果:

    多个相同数字出现,说明修改的时候发生了冲突

    改编代码:

    public class Test {
        public static void main(String[] args) throws Exception {
            Demo demo = new Demo();
            for (int i = 0; i < 10; i++) {
                new Thread(demo).start();
            }
        }
    }
    
    class Demo implements Runnable {
        // AtomicInteger是原子类,里面有volatile关键字,可以保证内存中的数据可见性
        private AtomicInteger serialNumber = new AtomicInteger();
    
        @Override
        public void run() {
            // 避免出不来原子性
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 打印出数字
            System.out.println(getSerialNumber());
        }
    
        // 执行加1操作
        private int getSerialNumber() {
            return this.serialNumber.getAndIncrement();
        }
    }
    

    结果:

    所有数字都是不相同的

    解释:

    本次使用的AtomicInteger是java.util.concurrent.atomic中的类,这里面的类可以保证数据可见性和原子性操作,具体解释如下:

    1. 使用volatile关键字保证了数据的可见性,相当于对于数据的操作都在主存中进行,不涉及到单个线程中缓存的事情
    2. JVM在里面使用了CAS算法(Compare-And-Swap),其中CAS算法可以保证数据的原子性,它是硬件对于并发操作共享数据的支持,CAS中包含了三个操作数,分别是内存值V(比较之前从主存中取出来的value值)、预估值A(比较之时从主存中取出来的value值)、更新值B(即我们想设置的值,如果通过比较发现内存值V和预估值A相等,就会将主存中的value值改成更新值B),解释这三个值之后我说一下如何进行数据更新,当且仅当内存值V==预估值A时,才将内存中的value值即内存值V设置成更新值B,否则更新失败,然后就执行循环操作,也就是继续进行更新操作,直到更新成功,这种做法比使用synchronized锁的效率更高,因为如果更新失败的话,该线程会立即继续进行尝试更新操作,这样不会浪费CPU的时间片(执行权),所以效率更高,至于CAS图解如下:
      在这里插入图片描述
    展开全文
  • Java NIO(New IO或 Non Blocking IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API。NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件的读写操作。 java IO 与 ...
    Java NIO(New IO或 Non Blocking IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API。NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件的读写操作。
    

    java IO 与 java NIO 的区别
    在这里插入图片描述

    通道与流的不同之处在于通道是双向的。而流只是在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类), 而 通道 可以用于读、写或者同时用于读写。
    因为它们是双向的,所以通道可以比流更好地反映底层操作系统的真实情况。特别是在 UNIX 模型中,底层操作系统通道是双向的。

    一、 通道(Channel)和缓冲区(Buffer)

    Java NIO系统的核心在于:通道和缓冲区。通道表示打开到IO设备(例如:文件、套接字)的连接。

    若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。

    简而言之,Channel 负责传输, Buffer 负责存储

    1.1 缓冲区(Buffer)

    缓冲区(Buffer) :一个用于特定基本数据类型的容器。缓冲区的底层用的是数组,根据传输数据类型的不同,java为我们提供了相应类型的缓冲区(基本数据类型除了boolean都有)。这些缓冲区的管理方式基本一致,都是通过allocate()方法获取一个缓冲区。

    // 分配一个指定大小的缓冲区
    ByteBuffer buf = ByteBuffer.allocate(1024);
    

    由 java.nio 包定义的,所有缓冲区都是 Buffer 抽象类的子类。
    Java NIO 中的 Buffer 主要用于与 NIO 通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道中的。

    Buffer 的常用方法:
    在这里插入图片描述
    **加粗样式**

    /*
     * 一、缓冲区(Buffer):在java NIO 中负责数据的存储。缓冲区就是数组。用于存储不同类型的数据。
     * 
     * 根据数据类型的不同(boolean 除外),有以下 Buffer 常用子类:
     * ByteBuffer
     * CharBuffer
     * ShortBuffer
     * IntBuffer
     * LongBuffer
     * FloatBuffer
     * DoubleBuffer
     * 
     * 上述缓冲区的管理方式几乎一致,通过allocate()获取缓冲区
     * 
     * 二、缓冲区存取数据的两个核心方法:
     * put():存入数据到缓冲区中
     *       put(byte b):将给定单个字节写入缓冲区的当前位置
     *       put(byte[] src):将 src 中的字节写入缓冲区的当前位置
     *       put(int index, byte b):将指定字节写入缓冲区的索引位置(不会移动 position)
     * get():获取缓存区中的数据
     *       get() :读取单个字节
     *       get(byte[] dst):批量读取多个字节到 dst 中
     *       get(int index):读取指定索引位置的字节(不会移动 position)
     *       
     * 三、缓冲区中的四个核心属性:
     * capacity:容量,表示缓冲区中最大存储数据的容量。一旦声明不能改变。
     * limit:界限,表示缓冲区中可以操作数据的大小。(limit后数据不能进行读写)
     * position:位置,表示缓冲区中正在操作数据的位置。
     * mark: 标记,表示记录当前position位置。可以通过reset()恢复到mark的位置。
     * 
     * 0<=mark<=position<=limit<=capacity
     * 
     */
    public class TestBuffer {
        @Test
        public void test1(){
            String str="abcde";
    
            //1.分配一个指定大小的缓冲区
            ByteBuffer buf=ByteBuffer.allocate(1024);
    
            System.out.println("--------------allocate()----------------");
            System.out.println(buf.position());//0
            System.out.println(buf.limit());//1024
            System.out.println(buf.capacity());//1024
    
            //2.利用put()存放数据到缓冲区中
            buf.put(str.getBytes());
    
            System.out.println("-------------put()-------------");
            System.out.println(buf.position());//5
            System.out.println(buf.limit());//1024
            System.out.println(buf.capacity());//1024
    
            //3.切换读取数据模式
            buf.flip();
            System.out.println("--------------flip()------------");
            System.out.println(buf.position());//0
            System.out.println(buf.limit());//5
            System.out.println(buf.capacity());//1024
    
            //4.利用get()读取缓冲区中的数据
            byte[] dst=new byte[buf.limit()];
            buf.get(dst);
            System.out.println(new String(dst,0,dst.length));//abcd
    
            System.out.println("--------------get()------------");
            System.out.println(buf.position());//5
            System.out.println(buf.limit());//5
            System.out.println(buf.capacity());//1024
    
            //5.rewind():可重复读
            buf.rewind();
    
            System.out.println("--------------rewind()------------");
            System.out.println(buf.position());//0
            System.out.println(buf.limit());//5
            System.out.println(buf.capacity());//1024
    
            //6.clear():清空缓冲区。但是缓冲区中的数据依然存在,但是处在“被遗忘”状态
            buf.clear();
    
            System.out.println("--------------clear()------------");
            System.out.println(buf.position());//0
            System.out.println(buf.limit());//1024
            System.out.println(buf.capacity());//1024
    
            System.out.println((char)buf.get());
        }
    
        @Test
        public void test2(){
            String str="abcde";
    
            ByteBuffer buf=ByteBuffer.allocate(1024);
    
            buf.put(str.getBytes());
    
            buf.flip();
    
            byte[] dst=new byte[buf.limit()];
            buf.get(dst,0,2);
            System.out.println(new String(dst,0,2));//ab
            System.out.println(buf.position());//2
    
            //mark():标记
            buf.mark();
    
            buf.get(dst,2,2);//再读两个位置
            System.out.println(new String(dst, 2, 2));//cd
            System.out.println(buf.position());//4
    
            //reset():恢复到mark的位置
            buf.reset();
            System.out.println(buf.position());//2
    
            //判断缓冲区中是否还有剩余数据
            if(buf.hasRemaining()){
                //获取缓冲区中可以操作的数量
                System.out.println(buf.remaining());//3
            }
        }
    }
    

    非直接缓冲区

    NIO通过通道连接磁盘文件与应用程序,通过缓冲区存取数据进行双向的数据传输。物理磁盘的存取是操作系统进行管理的,与物理磁盘的数据操作需要经过内核地址空间;而我们的Java应用程序是通过JVM分配的缓冲空间。有点雷同于一个属于核心态,一个属于应用态的意思,而数据需要在内核地址空间和用户地址空间,在操作系统和JVM之间进行数据的来回拷贝,无形中增加的中间环节使得效率与后面要提的之间缓冲区相比偏低。
    **加粗样式**

    直接缓冲区

    直接缓冲区则不再通过内核地址空间和用户地址空间的缓存数据的复制传递,而是在物理内存中申请了一块空间,这块空间映射到内核地址空间和用户地址空间,应用程序与磁盘之间的数据存取之间通过这块直接申请的物理内存进行。
    **加粗样式**

    /*
     * 四、直接缓冲区与非直接缓冲区:
     * 非直接缓冲区:通过allocate()方法分配缓冲区,将缓冲区建立在JVM的内存中。
     *            
     * 直接缓冲区:通过allocateDirect()方法分配直接缓冲区,将缓冲区建立在物理内存中。可以提高效率
     *          此方法返回的缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区 。
     *          直接缓冲区的内容可以驻留在常规的垃圾回收堆之外.
     *          将直接缓冲区主要分配给那些易受基础系统的本机 I/O 操作影响的大型、持久的缓冲区。
     *          最好仅在直接缓冲区能在程序性能方面带来明显好处时分配它们。
     *          直接字节缓冲区还可以过通过FileChannel 的 map() 方法将文件区域直接映射到内存中来创建 。该方法返回MappedByteBuffer
     */
    
     @Test
        public void test3(){
            //分配直接缓冲区
    		ByteBuffer buf1=ByteBuffer.allocateDirect(1024);
            System.out.println(buf1.isDirect());//true
    		
            ByteBuffer buf=ByteBuffer.allocate(1024);
            System.out.println(buf.isDirect());//false
        }
    
    

    1.2 通道

    通道:由java.nio.channels包定义。
    Channel表示IO源与目标打开的连接。
    Channel类似于传统的“流”。但其自身不能直接访问数据,Channel只能与Buffer进行交互。

    在这里插入图片描述
    操作系统中:通道是一种通过执行通道程序管理I/O操作的控制器,它使主机(CPU和内存)与I/O操作之间达到更高的并行程度。需要进行I/O操作时,CPU只需启动通道,然后可以继续执行自身程序,通道则执行通道程序,管理与实现I/O操作。

    FileChannel的常用方法:
    **加粗样式**

    /*
     * 一、通道(Channel):用于源节点与目标节点的连接。在java NIO中负责缓冲区中数据的传输。Channel本身不存储数据,需要配合缓冲区进行传输。(就像是铁路需要火车配合一样)
     * 
     * 二、通道的主要实现类
     *    java.nio.channels.Channel 接口:
     *        |--FileChannel:用于读取、写入、映射和操作文件的通道。
     *        |--SocketChannel:通过 TCP 读写网络中的数据。
     *        |--ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。
     *        |--DatagramChannel:通过 UDP 读写网络中的数据通道。
     *        
     * 三、获取通道
     * 1.java针对支持通道的类提供了getChannel()方法
     *      本地IO:
     *      FileInputStream/FileOutputStream
     *      RandomAccessFile
     *      
     *      网络IO:
     *      Socket
     *      ServerSocket
     *      DatagramSocket
     *      
     * 2.在JDK 1.7 中的NIO.2 针对各个通道提供了静态方法 open()
     * 3.在JDK 1.7 中的NIO.2 的Files工具类的newByteChannel()
     * 
     * 四、通道之间的数据传输
     * transferFrom()
     * transferTo()
     * 
     * 五、分散(Scatter)与聚集(Gather)
     * 分散读取(Scattering Reads):将通道中的数据分散到多个缓冲区中
     * 聚集写入(Gathering Writes):将多个缓冲区中的数据聚集到通道中
     * 
     * 六、字符集:Charset
     * 编码:字符串-》字符数组
     * 解码:字符数组-》字符串
     */
    public class TestChannel {
    
        // 利用通道完成文件的复制(非直接缓冲区)
        @Test
        public void test1(){
            long start=System.currentTimeMillis();
    
            FileInputStream fis=null;
            FileOutputStream fos=null;
    
            FileChannel inChannel=null;
            FileChannel outChannel=null;
            try{
                fis=new FileInputStream("d:/1.avi");
                fos=new FileOutputStream("d:/2.avi");
    
                // 1.获取通道
                inChannel=fis.getChannel();
                outChannel=fos.getChannel();
    
                // 2.分配指定大小的缓冲区
                ByteBuffer buf=ByteBuffer.allocate(1024);
    
                // 3.将通道中的数据存入缓冲区中
                while(inChannel.read(buf)!=-1){
                    buf.flip();//切换读取数据的模式
                    // 4.将缓冲区中的数据写入通道中
                    outChannel.write(buf);
                    buf.clear();//清空缓冲区
                }
            }catch(IOException e){
                e.printStackTrace();
            }finally{
                if(outChannel!=null){
                    try {
                        outChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(inChannel!=null){
                    try {
                        inChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(fos!=null){
                    try {
                        fos.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(fis!=null){
                    try {
                        fis.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            long end=System.currentTimeMillis();
            System.out.println("耗费时间:"+(end-start));//耗费时间:1094
        }
    
        //使用直接缓冲区完成文件的复制(内存映射文件)
        @Test
        public void test2() {
            long start=System.currentTimeMillis();
    
            FileChannel inChannel=null;
            FileChannel outChannel=null;
            try {
                inChannel = FileChannel.open(Paths.get("d:/1.avi"), StandardOpenOption.READ);
                outChannel = FileChannel.open(Paths.get("d:/2.avi"), StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
    
                //内存映射文件(只有ByteBuffer支持)
                MappedByteBuffer inMappedBuf=inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());
                MappedByteBuffer outMappedBuf=outChannel.map(MapMode.READ_WRITE, 0, inChannel.size());
                //直接对缓冲区进行数据的读写操作
                byte[] dst=new byte[inMappedBuf.limit()];
                inMappedBuf.get(dst);
                outMappedBuf.put(dst);
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                if(outChannel!=null){
                    try {
                        outChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(inChannel!=null){
                    try {
                        inChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            long end=System.currentTimeMillis();
            System.out.println("耗费的时间为:"+(end-start));//耗费的时间为:200
        }
    
        //通道之间的数据传输(直接缓冲区)
        @Test
        public void test3(){
            long start=System.currentTimeMillis();
    
            FileChannel inChannel=null;
            FileChannel outChannel=null;
            try {
                inChannel = FileChannel.open(Paths.get("d:/1.avi"), StandardOpenOption.READ);
                outChannel=FileChannel.open(Paths.get("d:/2.avi"), StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
    
                inChannel.transferTo(0, inChannel.size(), outChannel);
                outChannel.transferFrom(inChannel, 0, inChannel.size());
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                if(outChannel!=null){
                    try {
                        outChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(inChannel!=null){
                    try {
                        inChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            long end=System.currentTimeMillis();
            System.out.println("耗费的时间为:"+(end-start));//耗费的时间为:147
        }
    
        //分散和聚集
        @Test
        public void test4(){
            RandomAccessFile raf1=null;
            FileChannel channel1=null;
            RandomAccessFile raf2=null;
            FileChannel channel2=null;
            try {
                raf1=new RandomAccessFile("1.txt","rw");
    
                //1.获取通道
                channel1=raf1.getChannel();
    
                //2.分配指定大小的缓冲区
                ByteBuffer buf1=ByteBuffer.allocate(100);
                ByteBuffer buf2=ByteBuffer.allocate(1024);
    
                //3.分散读取
                ByteBuffer[] bufs={buf1,buf2};
                channel1.read(bufs);
    
                for(ByteBuffer byteBuffer : bufs){
                    byteBuffer.flip();
                }
                System.out.println(new String(bufs[0].array(),0,bufs[0].limit()));
                System.out.println("--------------------");
                System.out.println(new String(bufs[1].array(),0,bufs[1].limit()));
    
                //4.聚集写入
                raf2=new RandomAccessFile("2.txt", "rw");
                channel2=raf2.getChannel();
    
                channel2.write(bufs);
    
            }catch (IOException e) {
                e.printStackTrace();
            }finally{
                if(channel2!=null){
                    try {
                        channel2.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(channel1!=null){
                    try {
                        channel1.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(raf2!=null){
                    try {
                        raf2.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(raf1!=null){
                    try {
                        raf1.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        //输出支持的字符集
        @Test
        public void test5(){
            Map<String,Charset> map=Charset.availableCharsets();
            Set<Entry<String,Charset>> set=map.entrySet();
    
            for(Entry<String,Charset> entry:set){
                System.out.println(entry.getKey()+"="+entry.getValue());
            }
        }
    
        //字符集
        @Test
        public void test6(){
            Charset cs1=Charset.forName("GBK");
    
            //获取编码器
            CharsetEncoder ce=cs1.newEncoder();
    
            //获取解码器
            CharsetDecoder cd=cs1.newDecoder();
    
            CharBuffer cBuf=CharBuffer.allocate(1024);
            cBuf.put("啦啦哈哈吧吧");
            cBuf.flip();
    
            //编码
            ByteBuffer bBuf=null;
            try {
                bBuf = ce.encode(cBuf);
            } catch (CharacterCodingException e) {
                e.printStackTrace();
            }
    
            for(int i=0;i<12;i++){
                System.out.println(bBuf.get());//-64-78-64-78-71-2-7-2-80-55-80-55
            }
    
            //解码
            bBuf.flip();
            CharBuffer cBuf2=null;
            try {
                cBuf2 = cd.decode(bBuf);
            } catch (CharacterCodingException e) {
                e.printStackTrace();
            }
            System.out.println(cBuf2.toString());//啦啦哈哈吧吧
        }
    }
    

    在这里插入图片描述

    二、NIO的非阻塞式网络通信

    传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。

    Java NIO 是非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。

    选择器(Selector)
    选择器(Selector) 是 SelectableChannle 对象的多路复用器,Selector 可以同时监控多个 SelectableChannel 的 IO 状况,也就是说,利用 Selector可使一个单独的线程管理多个 Channel。Selector 是非阻塞 IO 的核心。

    /*
     * 一、使用NIO 完成网络通信的三个核心:
     * 
     * 1、通道(Channel):负责连接
     *      java.nio.channels.Channel 接口:
     *           |--SelectableChannel
     *               |--SocketChannel
     *               |--ServerSocketChannel
     *               |--DatagramChannel
     *               
     *               |--Pipe.SinkChannel
     *               |--Pipe.SourceChannel
     *               
     * 2.缓冲区(Buffer):负责数据的存取
     * 
     * 3.选择器(Selector):是 SelectableChannel 的多路复用器。用于监控SelectableChannel的IO状况
     */
    public class TestBlockingNIO {//没用Selector,阻塞型的
    
        //客户端
        @Test
        public void client() throws IOException{
            SocketChannel sChannel=SocketChannel.open(new InetSocketAddress("127.0.0.1",9898));
            FileChannel inChannel=FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
            ByteBuffer buf=ByteBuffer.allocate(1024);
            while(inChannel.read(buf)!=-1){
                buf.flip();
                sChannel.write(buf);
                buf.clear();
            }
            sChannel.shutdownOutput();//关闭发送通道,表明发送完毕
    
            //接收服务端的反馈
            int len=0;
            while((len=sChannel.read(buf))!=-1){
                buf.flip();
                System.out.println(new String(buf.array(),0,len));
                buf.clear();
            }
            inChannel.close();
            sChannel.close();
        }
    
        //服务端
        @Test
        public void server() throws IOException{
            ServerSocketChannel ssChannel=ServerSocketChannel.open();
            FileChannel outChannel=FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE,StandardOpenOption.CREATE);
            ssChannel.bind(new InetSocketAddress(9898));
            SocketChannel sChannel=ssChannel.accept();
            ByteBuffer buf=ByteBuffer.allocate(1024);
            while(sChannel.read(buf)!=-1){
                buf.flip();
                outChannel.write(buf);
                buf.clear();
            }
    
            //发送反馈给客户端
            buf.put("服务端接收数据成功".getBytes());
            buf.flip();//给为读模式
            sChannel.write(buf);
    
            sChannel.close();
            outChannel.close();
            ssChannel.close();
        }
    }
    

    SelectionKey

    当调用 register(Selector sel, int ops) 将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数 ops 指定。
    可以监听的事件类型(用 可使用 SelectionKey 的四个常量 表示):
     读 : SelectionKey.OP_READ (1)
     写 : SelectionKey.OP_WRITE (4)
     连接 : SelectionKey.OP_CONNECT (8)
     接收 : SelectionKey.OP_ACCEPT (16)
    若注册时不止监听一个事件,则可以使用“位或”操作符连接。

    SelectionKey:表示 SelectableChannel 和 Selector 之间的注册关系。每次向选择器注册通道时就会选择一个事件(选择键)。选择键包含两个表示为整数值的操作集。操作集的每一位都表示该键的通道所支持的一类可选择操作。

    在这里插入图片描述
    Selector 的常用方法
    **加粗样式**

    public class TestNonBlockingNIO {
        //客户端
        @Test
        public void client()throws IOException{
            //1.获取通道
            SocketChannel sChannel=SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
            //2.切换非阻塞模式
            sChannel.configureBlocking(false);
            //3.分配指定大小的缓冲区
            ByteBuffer buf=ByteBuffer.allocate(1024);
            //4.发送数据给服务端
            Scanner scan=new Scanner(System.in);
            while(scan.hasNext()){
                String str=scan.next();
                buf.put((new Date().toString()+"\n"+str).getBytes());
                buf.flip();
                sChannel.write(buf);
                buf.clear();
            }
            //5.关闭通道
            sChannel.close();
        }
    
        //服务端
        @Test
        public void server() throws IOException{
            //1.获取通道
            ServerSocketChannel ssChannel=ServerSocketChannel.open();
    
            //2.切换非阻塞式模式
            ssChannel.configureBlocking(false);
    
            //3.绑定连接
            ssChannel.bind(new InetSocketAddress(9898));
    
            //4.获取选择器
            Selector selector=Selector.open();
    
            //5.将通道注册到选择器上,并且指定“监听接收事件”
            ssChannel.register(selector,SelectionKey.OP_ACCEPT);
    
            //6.轮询式的获取选择器上已经“准备就绪”的事件
            while(selector.select()>0){
    
                //7.获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
                Iterator<SelectionKey> it=selector.selectedKeys().iterator();
    
                while(it.hasNext()){
                    //8.获取准备“就绪”的事件
                    SelectionKey sk=it.next();
    
                    //9.判断具体是什么时间准备就绪
                    if(sk.isAcceptable()){
                        //10.若“接收就绪”,获取客户端连接
                        SocketChannel sChannel=ssChannel.accept();
    
                        //11.切换非阻塞模式
                        sChannel.configureBlocking(false);
    
                        //12.将该通道注册到选择器上
                        sChannel.register(selector, SelectionKey.OP_READ);
                    }else if(sk.isReadable()){
                        //13.获取当前选择器上“读就绪”状态的通道
                        SocketChannel sChannel=(SocketChannel)sk.channel();
                        //14.读取数据
                        ByteBuffer buf=ByteBuffer.allocate(1024);
                        int len=0;
                        while((len=sChannel.read(buf))>0){
                            buf.flip();
                            System.out.println(new String(buf.array(),0,len));
                            buf.clear();
                        }
                    }
                    //15.取消选择键SelectionKey
                    it.remove();
                }
            }
        }
    }
    

    DatagramChannel

    Java NIO中的DatagramChannel是一个能收发UDP包的通道。

    public class TestNonBlockNIO2 {
        @Test
        public void send() throws IOException{
            DatagramChannel dc=DatagramChannel.open();
            dc.configureBlocking(false);
            ByteBuffer buf=ByteBuffer.allocate(1024);
            Scanner scan=new Scanner(System.in);
            while(scan.hasNext()){
                String str=scan.next();
                buf.put((new Date().toString()+"\n"+str).getBytes());
                buf.flip();
                dc.send(buf, new InetSocketAddress("127.0.0.1", 9898));
                buf.clear();
            }
            dc.close();
        }
    
        @Test
        public void receive() throws IOException{
            DatagramChannel dc=DatagramChannel.open();
            dc.configureBlocking(false);
            dc.bind(new InetSocketAddress(9898));
            Selector selector=Selector.open();
            dc.register(selector, SelectionKey.OP_READ);
            while(selector.select()>0){
                Iterator<SelectionKey> it=selector.selectedKeys().iterator();
                while(it.hasNext()){
                    SelectionKey sk=it.next();
    
                    if(sk.isReadable()){
                        ByteBuffer buf=ByteBuffer.allocate(1024);
                        dc.receive(buf)
    ;
                        buf.flip();
                        System.out.println(new String(buf.array(),0,buf.limit()));
                        buf.clear();
                    }
                }
                it.remove();
            }
        }
    }
    

    管道 (Pipe)

    Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。

    在这里插入图片描述

    public class TestPipe {
        @Test
        public void test1()throws IOException{
            //1.获取管道
            Pipe pipe=Pipe.open();
            //2.将缓冲区中的数据写入管道
            ByteBuffer buf=ByteBuffer.allocate(1024);
            Pipe.SinkChannel sinkChannel=pipe.sink();
            buf.put("通过单向管道发送数据".getBytes());
            buf.flip();
            sinkChannel.write(buf);
    
            //3.读取缓冲区中的数据
            Pipe.SourceChannel sourceChannel=pipe.source();
            buf.flip();
            int len=sourceChannel.read(buf);
            System.out.println(new String(buf.array(),0,len));
    
            sourceChannel.close();
            sinkChannel.close();
        }
    }
    

    三、NIO.2 – Path 、Paths 、Files

    Path 与 Paths

    java.nio.file.Path 接口代表一个平台无关的平台路径,描述了目录结构中文件的位置。
    Paths 提供的 get() 方法用来获取 Path 对象:Path get(String first, String … more) : 用于将多个字符串串连成路径。

    Path 常用方法:
    boolean endsWith(String path) : 判断是否以 path 路径结束 boolean
    startsWith(String path) : 判断是否以 path 路径开始
    boolean isAbsolute() :判断是否是绝对路径
    Path getFileName() : 返回与调用 Path 对象关联的文件名
    Path getName(int idx) : 返回的指定索引位置 idx 的路径名称
    int getNameCount() : 返回Path 根目录后面元素的数量
    Path getParent() :返回Path对象包含整个路径,不包含Path 对象指定的文件路径
    Path getRoot() :返回调用Path 对象的根路径
    Path resolve(Path p) :将相对路径解析为绝对路径
    Path toAbsolutePath() :
    作为绝对路径返回调用 Path 对象
    String toString() : 返回调用 Path 对象的字符串表示形式

    Files 类
    java.nio.file.Files 用于操作文件或目录的工具类。

    Files常用方法:
    Path copy(Path src, Path dest, CopyOption … how) : 文件的复制
    Path createDirectory(Path path, FileAttribute< ? > … attr) : 创建一个目录
    Path createFile(Path path, FileAttribute< ? > … arr) : 创建一个文件 void
    delete(Path path) : 删除一个文件 Path move(Path src, Path dest,
    CopyOption…how) : 将 src 移动到 dest 位置 long size(Path path) : 返回 path指定文件的大小

    Files常用方法:用于判断

    boolean exists(Path path, LinkOption … opts) : 判断文件是否存在
    boolean isDirectory(Path path, LinkOption … opts) : 判断是否是目录
    boolean isExecutable(Path path) : 判断是否是可执行文件
    boolean isHidden(Path path) : 判断是否是隐藏文件
    boolean isReadable(Path path) : 判断文件是否可读
    boolean isWritable(Path path) : 判断文件是否可写
    boolean notExists(Path path, LinkOption … opts) : 判断文件是否不存在
    public static < A extends BasicFileAttributes> A readAttributes(Path path,Class< A > type,LinkOption…options) : 获取与 path 指定的文相关联的属性。

    Files常用方法:用于操作内容

    SeekableByteChannel newByteChannel(Path path, OpenOption…how) :获取与指定文件的连接,how 指定打开方式。
    DirectoryStream newDirectoryStream(Path path) : 打开 path 指定的目录
    InputStream newInputStream(Path path, OpenOption…how):获取InputStream 对象
    OutputStream newOutputStream(Path path, OpenOption…how) : 获取 OutputStream 对象

    展开全文
  • 尚硅谷-netty笔记

    2021-05-12 14:16:40
    1:简介 Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github上的独立项目。 Netty 是一个异步的、基于事件驱动(客户端的行为、读写事件)的网络...要透彻理解Netty , 需要先学习 NIO , 这样我们才能阅读 Netty
  • 【JVM】最全笔记(黑马+尚硅谷+张龙整合笔记

    千次阅读 多人点赞 2020-03-30 22:11:43
    本身整合了如下视频的笔记,并进行了整理:尚硅谷周阳、张龙、黑马程序员 黑马ppt非常好:https://download.csdn.net/download/hancoder/12834607 本文及JVM系列笔记地址:...1_介绍 1.1_什么是JVM 定义:java virtual ...
  • 尚硅谷JAVA基础课程笔记整理(从面向对象开始)
  • 通过NIO编写简单版聊天室,客户端通过控制台输入发送消息到其他客户端。注意:并未处理粘包半包问题。 逻辑简述 服务器: 1)创建服务器NIO通道,绑定端口并启动服务器 2)开启非阻塞模式 3)创建选择器、并...
  • 【Redis】笔记尚硅谷、黑马整合)

    千次阅读 多人点赞 2020-04-22 21:56:43
    笔记内容包括两个视频的笔记: Redis—尚硅谷java研究院 (推荐)Redis入门到精通【黑马程序员】https://www.bilibili.com/video/BV1CJ411m7Gc 第1章 NoSQL 简介 REmote Dictionary Server:是一种用C语言开发的开源...
  • 学习基于尚硅谷的Netty视频教程,笔记也会查阅其他资料来完善观看视频中本人理解模糊的地方。 Buffer是什么?干啥用的? 缓冲区:本质上是一个可以读写的内存块,可以理解为一个数据容器(数组)。 没有缓冲区...
  • NIO3.1 基本介绍3.2 NIO 和 BIO 的比较3.3 NIO 三大核心3.4 缓冲区(Buffer)3.5 通道(Channel)1. 基本介绍2. FileChannel 类3. 应用实例1-本地文件写数据4. 应用实例2-本地文件读数据5. 应用实例3-使用一个Buffer...
  • 看了尚硅谷宋红康老师的Java基础,按照老师的笔记自己又整理了一份markdown格式的,感觉比教程中eDiary的更清晰有条理。有需要的可以自取,链接在末尾哦~~ 文档预览: 文档链接:...
  • 尚硅谷2019年Netty教程 netty 源码分析 ---- 目标netty—step6.10 https://blog.csdn.net/wei198621/article/details/108872300 尚硅谷2019年Netty教程 Netty中处理耗时操作 ----目标netty—step5.03 https:...
  • 文章目录Dubbo Dubbo
  • 文章目录1. 简介2. 应用场景 1. 简介 Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github上的独立项目。...Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景 要透彻理解Netty , 需要先学习 N
  • // 使用链式编程进行配置参数 bootstrap.group(bossGroup, workerGroup)// 设置两个线程组 .channel(NioServerSocketChannel.class)// 使用 NioServerSocketChannel 作为服务器的通道实现 .option(ChannelOption.SO...
  • 学习基于尚硅谷的Netty视频教程,笔记也会查阅其他资料来完善观看视频中本人理解模糊的地方。 结合之前的知识,深入理解下NIO非阻塞的网络编程原理,并写一个NIO的demo,demo包含了服务端和客户端 逻辑流程 1....
  • 学习基于尚硅谷的Netty视频教程,笔记也会查阅其他资料来完善观看视频中本人理解模糊的地方。 了解Netty之前需要熟悉 同步、异步、阻塞、非阻塞都是什么意思。 同步: A调用B,需要A主动等待B处理完后返回结果!...
  • 来源于NIO,通过存在堆中的DirectByteBuffer操作Native内存通常,访问直接内存的速度会优于Java堆。即读写性能高。 因此出于性能考虑,读写频繁的场合可能会考虑使用直接内存。 Java的NIO库允许Java程序使用直接内存...
  • NIO 笔记

    2021-03-22 09:56:36
    NIO 笔记尚硅谷NIO简介NIO 与 IO 的主要区别核心 缓冲区(Buffer)和通道(Channel)缓冲区(Buffer)Buffer 常用子类:Buffer 中的重要概念:Buffer 的常用方法直接缓冲区与非直接缓冲区:通道(Channel)通道: 可以...
  • 第十三章 IO流 File类的使用 IO流原理及流的分类 节点流 缓冲流 转换流 标准输入、输出流 打印流 数据流 对象流 随机存取文件流 NIO.2中Path、Paths,Files类的使用 File类的使用 File类的一个对象,代表一个文件或...

空空如也

空空如也

1 2 3 4 5 ... 15
收藏数 284
精华内容 113
关键字:

尚硅谷nio笔记