精华内容
下载资源
问答
  • 通过BufferedReader的ready()方法来解决,其规约中表述:该方法能告诉我们该是否准备好被... InputStream is = socket.getInputStream(); StringWriter writer = new StringWriter(); BufferedReader reader = new

    通过BufferedReader的ready()方法来解决,其规约中表述:该方法能告诉我们该流是否准备好被读取,即流中内容不空时返回true,否则返回false。
    在这里插入图片描述

    代码如下:

    // 输入流
                    InputStream is = socket.getInputStream();
                    StringWriter writer = new StringWriter();
                    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
                    char[] bt = new char[1024];
                    do {
                        reader.read(bt);
                    } while (reader.ready());
                    writer.write(bt,0,bt.length);
                    String request = writer.toString();
    
    展开全文
  • 创建StreamWordCount_Socket.class,使用socket无界处理 import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; //import org.apache.flink.client....

    1、环境

    1. JDK 1.8
    2. Maven
    3. Intellij Idea
    4. Flink1.9.2

    2、创建maven项目

    导入依赖

    注意:这里<flink.version>1.9.2</flink.version>
        <
    scala.binary.version>2.12</scala.binary.version>版本号要与安装的flink版本号一致,否则最后提交时报错。我这里是

    Pom.xml中添加

    <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <flink.version>1.9.2</flink.version>
            <scala.binary.version>2.12</scala.binary.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>
    使用socket接收数据,创建StreamWordCount_Socket.class,使用socket无界流处理
    import org.apache.flink.api.common.functions.FlatMapFunction;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    
    //import org.apache.flink.client.program.StreamContextEnvironment;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import org.apache.flink.util.Collector;
    
    
    
    /**
    
     * @author : Ashiamd email: ashiamd@foxmail.com
    
     * @date : 2021/1/29 11:13 PM
    
     */
    
    public class StreamWordCount_Socket {
    
        //无界流--socket
    
        public static void main(String[] args) throws Exception {
    
    
    
            // 创建流处理执行环境
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    
    
            // 设置并行度,默认值 = 当前计算机的CPU逻辑核数(设置成1即单线程处理)
    
            // env.setMaxParallelism(32);
    
    
    
            // 从文件中读取数据 有界流处理
    
            //String inputPath = "D:\\IdeaProjects\\flink-study\\src\\main\\resources\\hello.txt";
    
            DataStream<String> inputDataStream = env.socketTextStream("localhost",9000);
    
    
    
            // 基于数据流进行转换计算 流处理中数据分组keyBy(相同数据分组)  批处理中分组groupby
    
            DataStream<Tuple2<String,Integer>> resultStream = inputDataStream.flatMap(new MyFlatMapper())
    
                    .keyBy(item->item.f0)
    
                    .sum(1);
    
    
    
            resultStream.print();
    
    
    
            // 执行任务
    
            env.execute();
    
        }
    
        public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
    
    
            @Override
    
            public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
    
                // 按空格分词
    
                String[] words = s.split(" ");
    
                // 遍历所有word,包成二元组输出
    
                for (String str : words) {
    
                    out.collect(new Tuple2<>(str, 1));
    
                }
    
            }
    
        }
    
    }
    端口号与Ubuntu端口号一致。

    3、安装flink

    官网:https://flink.apache.org/downloads.html

    安装了1.9.2版本,1.9以上版本bin中无.Bat文件,我的笔记本上无法运行。

     

     

    运行flink

     

    浏览器进入http://localhost:8081/#/overview

     

    4、安装Ubuntu 使用nc命令发送socket

    本机安装netcat nc命令执行不了,在Windows store中安装Ubuntu子系统,Linux系统中自带nc命令,发送socket。

    5、运行效果:

    打开Ubuntu输入nc -l -k 9000

    9000是端口号,在StreamWordCount_Socket.class中对应输入端口号,idea中运行如下:

     6、提交到flink仪表盘:

    打包

    ​​​​​​​

     

     启动flink

    进入http://localhost:8081/#/overview

    找到项目target中的jar包:

    输入入口和host、port,点击提交

     

    提交成功

    如果在提交时报错可以打开flink log文件查看报错信息,

    一开始提交显示服务器错误,查看log文件发现报错:

    NoClassDefFoundError: org/apache/flink/client/program/StreamContextEnvironment

    原因是pom文件中导入依赖版本和flink版本不同,在1.9.2中没有

    StreamContextEnvironment.class

    原本导入的是1.12.1,重新导入1.9.2的依赖,在1.9.2中使用的是StreamExecutionEnvironment.class

    所以要注意不同版本 依赖中可能有不同的类。

     

    展开全文
  • 本文为大家分享了Java socket字节传输示例,供大家参考,具体内容如下服务端server端:package com.yuan.socket;import java.io.*;import java.net.ServerSocket;import java.net.Socket;/*** Created by YUAN on ...

    本文为大家分享了Java socket字节流传输示例,供大家参考,具体内容如下

    服务端server端:

    package com.yuan.socket;

    import java.io.*;

    import java.net.ServerSocket;

    import java.net.Socket;

    /**

    * Created by YUAN on 2016-09-17.

    */

    public class TalkServer4Byte {

    private ServerSocket server;

    private int port = 5020;

    public TalkServer4Byte() {

    try {

    server = new ServerSocket(port);

    } catch (IOException e) {

    }

    }

    public void talk() {

    System.out.println("监控端口:" + port);

    Socket socket = null;

    while (true) {

    try {

    // 阻塞等待,每接收到一个请求就创建一个新的连接实例

    socket = server.accept();

    System.out.println("连接客户端地址:" + socket.getRemoteSocketAddress());

    // 装饰流BufferedReader封装输入流(接收客户端的流)

    BufferedInputStream bis = new BufferedInputStream(

    socket.getInputStream());

    DataInputStream dis = new DataInputStream(bis);

    byte[] bytes = new byte[1]; // 一次读取一个byte

    String ret = "";

    while (dis.read(bytes) != -1) {

    ret += bytesToHexString(bytes) + " ";

    if (dis.available() == 0) { //一个请求

    doSomething(ret);

    }

    }

    } catch (IOException e) {

    System.out.println(e.getMessage());

    } finally {

    try {

    socket.close();

    } catch (IOException e) {

    System.out.println(e.getMessage());

    }

    }

    }

    }

    public static void doSomething(String ret) {

    System.out.println(ret);

    }

    public static String bytesToHexString(byte[] src) {

    StringBuilder stringBuilder = new StringBuilder("");

    if (src == null || src.length <= 0) {

    return null;

    }

    for (int i = 0; i < src.length; i++) {

    int v = src[i] & 0xFF;

    String hv = Integer.toHexString(v);

    if (hv.length() < 2) {

    stringBuilder.append(0);

    }

    stringBuilder.append(hv);

    }

    return stringBuilder.toString();

    }

    public static String BytesHexString(byte[] b) {

    String ret = "";

    for (int i = 0; i < b.length; i++) {

    String hex = Integer.toHexString(b[i] & 0xFF);

    if (hex.length() == 1) {

    hex = '0' + hex;

    }

    ret += hex.toUpperCase();

    }

    return ret;

    }

    public static void main(String[] args) {

    TalkServer4Byte server = new TalkServer4Byte();

    server.talk();

    }

    }

    客户端client代码:

    package com.yuan.socket;

    import java.io.DataInputStream;

    import java.io.DataOutputStream;

    import java.io.IOException;

    import java.io.InputStream;

    import java.net.InetSocketAddress;

    import java.net.Socket;

    import java.net.SocketAddress;

    /**

    * Created by YUAN on 2016-09-17.

    */

    public class TalkClient4Byte {

    private Socket socket;

    private SocketAddress address;

    public TalkClient4Byte() {

    try {

    socket = new Socket();

    address = new InetSocketAddress("127.0.0.1", 5020);

    socket.connect(address, 1000);

    } catch (IOException e) {

    e.printStackTrace();

    }

    }

    public void talk() {

    try {

    //使用DataInputStream封装输入流

    InputStream os = new DataInputStream(System.in);

    byte [] b = new byte[1];

    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());

    while (-1 != os.read(b)) {

    dos.write(b); // 发送给客户端

    }

    dos.flush();

    dos.close();

    } catch (IOException e) {

    e.printStackTrace();

    } finally {

    try {

    socket.close();

    } catch (IOException e) {

    }

    }

    }

    public static void main(String[] args) {

    TalkClient4Byte client = new TalkClient4Byte();

    client.talk();

    }

    }

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

    展开全文
  • os = writeSocket.getOutputStream(); while(!readSocket.isClosed() && !writeSocket.isClosed() && (n=is.read(b))>-1){ //int size = is.read(b); os.write(b, 0, n); os.flush(); } } catch (Exception e) { e....

    问题描述

    InputStream读入字节时,CPU会爆满

    问题出现的环境背景及自己尝试过哪些方法

    经过排查修改,找到了问题,但是很迷惑,不知道为什么会出现这种情况,如图

    f4d2bf007f752324eeb8bbac70b0c3a3.png

    相关代码

    // 请把代码文本粘贴到下方(请勿用图片代替代码)

    @Override

    public void run() {

    byte[] b = new byte[1024];

    InputStream is = null;

    OutputStream os = null;

    int n=0;

    try {

    is = readSocket.getInputStream();

    os = writeSocket.getOutputStream();

    while(!readSocket.isClosed() && !writeSocket.isClosed() && (n=is.read(b))>-1){

    //int size = is.read(b);

    os.write(b, 0, n);

    os.flush();

    }

    } catch (Exception e) {

    e.printStackTrace();

    }finally {

    System.out.println("finally进入");

    try {

    if (is != null) {

    is.close();

    }

    if (null != os) {

    os.flush();

    os.close();

    }

    } catch (IOException e) {

    e.printStackTrace();

    }

    }

    }

    你期待的结果是什么?实际看到的错误信息又是什么?

    我希望了解为什么n=is.read(b)这句代码放在不同的位置会造成完全不同的影响,感谢

    展开全文
  • 在我编程中遇到了一个问题,就是客户端如果不关闭输入的话,服务端与客户端之间的通讯就会卡住,只要一调用socket。shutdownInput关闭输入就正常客户端代码public static void main(String[] args) {try {//1....
  • A能否再次获得输入(inA = socketA.getInputStream())?是否A还可以发送数据,因为其输出没有关闭?问题二:A如果仅仅将输出关闭呢?问题三:A如果将输入输出都关闭,socket还有效吗?如果无效是否可以省略...
  • 这是我的代码:Socket socket = new Socket("someMachine",16003);OutputStream outputStream = socket.getOutputStream();InputStream inputStream = socket.getInputStream();try {outputStream.write...
  • 在我有ServerSocket侦听传入连接的类中,以下是代码:while(isRunning){try{Socket s = mysocketserver.accept();acknowledgeClient(s);new ClientHandler(s).start(); //Start new thread to serve the client, and ...
  • 转自:...在用socket写一个服务器时遇到了问题于是将主要的问题抽了出来,代码如下,由于代码很简单于是也没有注释。public class Main {private static ServerSocket serverSocket;private...
  • 我尝试过使用read() = -1这种方法来判断是否读完字节DataInputStream in = new DataInputStream(socket.getInputStream());ByteArrayOutputStream swapStream = new ByteArrayOutputStream();int read = 0;while ...
  • 一、Java IO 和 系统 IO 不匹配在...操作系统与 Java 基于的 I/O模型有些不匹配。操作系统要移动的是大块数据(缓冲区),这往往是在硬件直接存储器存取(DMA)的协助下完成的。而 JVM 的 I/O 操作类喜欢操作小块数据...
  • 前言本文主要给大家介绍了关于Java中Socket通信的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧。Java中基于TCP协议实现网络通信的两个类:客户端的Socket和服务器端的ServerSocket。...
  • 榕树贷款网络IO的本质就是socket流的读取,通常一次IO读取会涉及两个阶段与两个对象,其中两个对象为:用户进程(线程)Process(Thread)、内核对象(kernel),两个阶段为:等待流数据准备阶段、从内核向进程复制...
  • //oos为ObjectOutputStream类型的一个全局引用,指向对象输出对象 oos.writeObject(message); oos.flush(); } catch (Exception e) { e.printStackTrace(); } 服务端: //服务端代码片: //ois为ObjectInputStream...
  • java socket原理

    2021-02-28 08:54:59
    此外,字节服务的自身属性决定了其无法保留输入中消息的边界消息 Socket数据传输的底层实现 一旦创建了一个套接字实例,操作系统就会为其分配缓冲区以存放接收和要发送的数据。 Java可以设置读写缓冲区的大小: ...
  • 经过查询得知,Java的socket是一个全双工套接字,任何的输入或输出的close()都会造成Socket关闭。 解决方案: 将socket的输入输出提到对象中作为参数,等speak和listen线程都结束后,在主函数里关闭。 等...
  • // 是否关闭输入 System.out.println(socket.isOutputShutdown()); // 是否关闭输出 System.out.println("结束:" + format.format(new Date())); } } catch (Exception e) { e.printStackTrace(); } } private...
  • 工作中常常会用到对远程服务的访问,但是又不是同一...(下期介绍)这里先来讲述下,java下socket对象的传输话不多少,用一个小例子来实现。//客户端代码public class SocketClient {Socket client;PrintWriter pw;p...
  • 现在要求我们自己也需要搭建一个socket服务器,要求对方也可以连接我们服务器,获取相关数据。以前没有做个socket连接,刚刚接触服务器也已经设置了一个端口好,可以telnet访问看网络很多关于socket的php介绍...
  • 传输文本信息 ...Socket socket=server.accept();//会阻断 可用新线程执行 //开始从服务端得到数据,将服务器的文件流传入in FileInputStream in=(FileInputStream)scoket.getInputStream(); //若需要传输
  • socket套接字

    2021-10-11 11:31:03
    socket套接字——套接字一、什么是套接字二、套接字的属性1、套接字的域2、套接字的类型3、套接字的协议三、套接字地址1、AF_UNIX2、AF_INET 一、什么是套接字 socket,即套接字是一种通信机制,凭借这种机制,...
  • Java中基于TCP协议实现网络通信的两个类:客户端的Socket和服务器端的ServerSocket。Socket通信模型如图所示:不管Socket通信的功能有多复杂,任何socket通信过程的基本结构都是一样的。其基本步骤为:①分别在...
  • java socket输入输出

    2021-05-30 19:39:36
    Socket提供了如下两个方法来获取输入和输出。 InputStream getInputStream() : 返回Socket对象对于的输入,让程序通过该输入Socket中取出数据。 OutputStream getOutputStream() : 返回Socket对象对应的...
  • Java用Socket解析字节数据背景问题与解决1. Socket连接与数据读取方式的选择2. 内部数据协议的顶层解析过程3. 字节数据解析成Java数据类型的问题 背景 因业务需求,需要完成一个TCP连接数据的解析与转发的插件。 ...
  • 取得摄像头采集的视频2.发送到服务器端` protected MediaRecorder mMediaRecorder; private LocalServerSocket mLss = null; private LocalSocket mReceiver, mSender = null; mLss = new LocalServerSocket(...
  • //创建socket SOCKET RecvSocket; RecvSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); //设置服务器地址 sockaddr_in RecvAddr; int Port = 27015; RecvAddr.sin_family = AF_INET; RecvAddr.sin_port = ...
  • 最近公司在使用socket搞一个内部会议系统,主要包括推,广播,禁言等等的一些功能。 由于观看人数800人第一天上线就炸了。。。。。。自己对于socket还是不太了解,自己学习了一段时间后的一些感悟。 socket原来...
  • 比如Socket。我们就在前次小例子的基础上来修改https://blog.csdn.net/xxkalychen/article/details/117148830。 修改部分很简单。我们另外创建一个测试类,其他都不用修改。 package com.chris.flink

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 268,835
精华内容 107,534
关键字:

socket流