Netty传输文件出现java.io.IOException: Connection reset by peer

weixin_38073649 2015-07-24 12:40:21
@冠超杨 你好,想跟你请教个问题: 这是错误信息 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:446) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:744) 然后我的代码是: fileServer.java public class FileServer { static final int PORT = ConfigUtils.getInteger("trans.server.file.port"); static final int BOSSTHREAD_COUNT = ConfigUtils.getInteger("trans.server.bossThread.size"); static final int WORKERTHREAD_COUNT = ConfigUtils.getInteger("trans.server.workerThread.size"); public static void main(String[] args) { try { System.out.println("进入main函数"); runServer(); System.out.println("netty server started..."); } catch (Exception e) { System.out.println("netty server start failed!"); e.printStackTrace(); } } /** * 启动服务 */ public static void runServer() throws Exception{ EventLoopGroup bossGroup = new NioEventLoopGroup(BOSSTHREAD_COUNT); //开启n个boss线程接受传入的请求 EventLoopGroup workerGroup = new NioEventLoopGroup(WORKERTHREAD_COUNT); //开启n个worker线程处理boss接受的请求的具体操作 try { ServerBootstrap bootstrap = new ServerBootstrap(); //服务器启动相关操作工具类 bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为通道接受到来的请求 .option(ChannelOption.SO_BACKLOG, 128) // option()接受请求的NioServerSocketChannel的选项设置 .childOption(ChannelOption.SO_KEEPALIVE, true) // .childHandler(new ChannelInitializer<SocketChannel>() { //帮助用户配置一个新的通道,可以添加更多的处理程序实现复杂逻辑 @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(                     new ByteArrayEncoder(), new FileServerHandler() );   } }); // Bind and start to accept incoming connections. ChannelFuture f = bootstrap.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally{ // Shut down all event loops to terminate all threads.  bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } fileServerHander.java public class FileServerHandler extends ChannelInboundHandlerAdapter { private static String receiveDir = ConfigUtils.getString("trans.server.file.dir"); private boolean first = true; private FileOutputStream fos; private BufferedOutputStream bufferedOutputStream; private String filePath; private long fileLength; private long readLength; private String fileId; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { .... //此处代码省略 //保存文件并更新数据库文件路径 String directory = receiveDir+FileUtils.getFolderByDate(); fileId = jsonInfo.getString("file_id");//数据库中的表示该文件数据的Id String filetxt = jsonInfo.getString("file_name"); String fileName = ""; if(filetxt.contains(".")){ fileName = filetxt.substring(filetxt.lastIndexOf(".")); } File file_dir = new File(directory); if (!file_dir.exists()) { file_dir.mkdirs();//创建目录 } filePath = directory + ConstUtils.FILE_RECEIVED_PREFIX + fileId+UUIDGenerator.getId()+fileName;//EN_表示未解密 fileLength = jsonInfo.getLong("file_size"); File file = new File(filePath); if (!file.exists()) { file.createNewFile();//创建文件 } fos =  new FileOutputStream(file); bufferedOutputStream = new BufferedOutputStream(fos); first = false; jsonInfo.put("server_path", filePath); CheckInfo.update(jsonInfo); JSONObject response = new JSONObject(); response.put("msgType", ConstUtils.TRANSFER_DATATYPE_C); response.put("msg", fileId); ctx.writeAndFlush(response.toString().getBytes()); }else{ bufferedOutputStream.write(bytes, 0, bytes.length); // buf.release(); readLength += bytes.length; System.out.println(readLength+"=="+fileLength+"="+(readLength == fileLength)); if (readLength >= fileLength) { CheckInfo.updateFileCatchState(fileId);//修改文件获取状态 JSONObject response = new JSONObject(); System.out.println("----------------start----------------"); if (bufferedOutputStream != null ) { bufferedOutputStream.flush(); bufferedOutputStream.close(); bufferedOutputStream=null; } if (fos != null) { fos.close(); fos = null; } System.out.println("----------------buffereoutputStream----------------"+bufferedOutputStream); response.put("msgType", ConstUtils.TRANSFER_DATATYPE_FILE); response.put("msg", ConstUtils.TRANSFER_FILE_STATUS); ctx.writeAndFlush(response.toString()); ctx.close(); } } } catch (Exception e) { System.out.println("文件传输信息:"+message); System.out.println("错误信息Exception:"+e.getMessage()); e.printStackTrace(); JSONObject response = new JSONObject(); response.put("msgType", ConstUtils.TRANSFER_DATATYPE_RESPONSE_STATUS); response.put("msg", ConstUtils.RESPONSE_STATUS_ERR); ctx.writeAndFlush(response.toString()); ctx.close(); }finally{ buf.release(); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { /*if (bufferedOutputStream != null ) { bufferedOutputStream.flush(); bufferedOutputStream.close(); } if (fos != null) { fos.close(); }*/ // ctx.channel().c System.out.println(" fileServerHandler in channelInactive"); super.channelInactive(ctx); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { try { bufferedOutputStream.flush(); bufferedOutputStream.close(); fos.close(); System.out.println("exceptionCaught: "+cause.getStackTrace()); JSONObject response = new JSONObject(); response.put("msgType", ConstUtils.TRANSFER_DATATYPE_FILE); response.put("msg", ConstUtils.RESPONSE_STATUS_ERR); ctx.writeAndFlush(response.toString()); ctx.close(); } catch (IOException e) { System.out.println("exceptionCaught:-----------"); e.printStackTrace(); } } } 可以帮我看看么?这问题我都纠结了几天了,谢谢了!
...全文
960 回复 打赏 收藏 转发到动态 举报
写回复
用AI写文章
回复
切换为时间正序
请发表友善的回复…
发表回复

476

社区成员

发帖
与我相关
我的任务
社区描述
其他技术讨论专区
其他 技术论坛(原bbs)
社区管理员
  • 其他技术讨论专区社区
加入社区
  • 近7日
  • 近30日
  • 至今
社区公告
暂无公告

试试用AI创作助手写篇文章吧