精华内容
下载资源
问答
  • java高并发

    2018-11-05 22:38:03
    本文档主要系统性的总结和阐述了与Java并发相关的知识点
  • 【数据库缓存】【数据库缓存】【服务器的集群化,以及负载均衡】
  • 针对高并发上传图片设计结构,主要针对于WEB开发场景使用
  • 高并发解决方案

    2018-05-07 15:46:24
    高并发解决方案
  • C 语言实现的http文件上传下载服务 系统平台:windows 开发工具:vs2010 开发语言:C 程序为单线程,使用I/O多路复用实现并发 抽取libevent的最最最基础框架,自己封装event 使用BSD tree.h的红黑树
  • java 高并发解决 思路

    2016-09-02 17:55:38
    java 高并发解决思路 文档中涉及到很多实例
  • Java模拟高并发上传数据

    千次阅读 2018-03-07 11:44:13
    Java模拟高并发上传数据 参考博客:JAVA 模拟瞬间高并发 在这一篇博客,我会记录整个我模拟高并发的过程。 从参考的博客那里,我学会了使用线程池和CountDownLatch。 一、模拟高并发初试(小菜) 这个转载的...

    Java模拟高并发上传数据

    参考博客:JAVA 模拟瞬间高并发

    在这一篇博客,我会记录整个我模拟高并发的过程。

    从参考的博客那里,我学会了使用线程池和CountDownLatch。

    一、模拟高并发初试(小菜)

    这个转载的代码,我自己进行尝试之后,为帮助学习,添加了易于了解的注释。

    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 模拟大坏蛋和小小兵 主线程代表大坏蛋,新建n个小黄人,等待坏蛋下指令; 
     * 大坏蛋下了指令,小黄人开始执行任务,执行完之前,坏蛋要一直等着。
     * 小小兵执行完任务,大坏蛋才能下发新的指令。
     * 
     * @author Administrator
     *
     */
    public class CountdownLatchTest {
    
        public static void main(String[] args) {
            // 创建一个线程池(一个军队)
            ExecutorService service = Executors.newCachedThreadPool();
            // 坏蛋指令,默认为1,变为0时,执行命令
            final CountDownLatch cdOrder = new CountDownLatch(1);
    
            //小黄人数量
            final int soilder_count = 6;
    
            // n个小黄人,一个小黄人执行完任务,cdAnswer减1,n个小黄人执行完毕,cdAnswer为0
            final CountDownLatch cdAnswer = new CountDownLatch(soilder_count);
    
            for (int i = 0; i < soilder_count; i++) {
    
                Runnable runnable = new Runnable() {
    
                    @Override
                    public void run() {
                        //小黄人整装待发
                        System.out.println("小黄人 : " + Thread.currentThread().getName() + " 准备好了.");
    
                        try {
                            cdOrder.await();//等着坏蛋发出指令
                            //小黄人收到指令
                            System.out.println("小黄人 : " + Thread.currentThread().getName() + " 接受命令.");
    
                            Thread.sleep((long) (Math.random() * 10000));
                            //执行完毕,告诉大坏蛋。
                            System.out.println("小黄人 : " + Thread.currentThread().getName() + " 告诉大坏蛋,任务完成.");
    
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        } finally {
                            //执行一个减少一次
                            cdAnswer.countDown();
                        }
    
                    }
                };
    
                service.execute(runnable);// 往执行军队里加派小黄人
            }
    
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("After 10s.....");
    
                //马上就要下达指令啦!
                System.out.println("大坏蛋 : " + Thread.currentThread().getName() + " 将要下发指令.");
    
                //发布指令,等待他们执行完
                System.out.println("大坏蛋 : " + Thread.currentThread().getName() + " 发布指令了,快去执行,等你们消息");
    
                cdOrder.countDown();// 发送指令“小黄人出动!”
    
                cdAnswer.await(); // 大坏蛋等着小黄人回来
    
                //收到所有完成的反馈
                System.out.println("大坏蛋 : " + Thread.currentThread().getName() + " 任务都完成了,收到了所有的反馈.");
    
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
            service.shutdown();// 整个军队撤回!
        }
    
    }

    代码说明:

    1. ExecutorService service = Executors.newCachedThreadPool();
    这行代码创建了一个线程池。用于存放阻塞的线程。
    2. final CountDownLatch cdOrder = new CountDownLatch(1);
    这行代码就是开关,如何让阻塞在线程池里的线程全部都启动,就像军队里的士兵一样,一个命令让他们全部出动。
    cdOrder.countDown();则值减一,值为零时,就打开开关了。
    3. final CountDownLatch cdAnswer = new CountDownLatch(soilder_count);
    和上一个CountDownLatch 类似,但是这里的值将为一时,表示线程都执行结束了。

    关于线程池和CountDownLatch ,建议参考博客:
    - Java线程池 ExecutorService
    - ExecutorService深入理解
    - Java线程池类ThreadPoolExecutor、ScheduledThreadPoolExecutor及Executors工厂类

    执行结果如下:

    这里写图片描述

    二、真实上传数据测试接口

    模拟上传数据是用okhttp来进行文件的post上传,分别进行了10个并发、20个并发进行测试,每个线程单个文件大小11M。

    测试demo:

    
    import java.io.File;
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    import okhttp3.MediaType;
    import okhttp3.MultipartBody;
    import okhttp3.OkHttpClient;
    import okhttp3.Request;
    import okhttp3.RequestBody;
    import okhttp3.Response;
    
    /**
     * 模拟高并发上传数据
     * 
     * @author Administrator
     *
     */
    public class LocalUploadData {
    
    
        public static void main(String[] args) {
            // 创建一个线程池
            ExecutorService service = Executors.newCachedThreadPool();
    
            // 计划放置10个线程
            final int countLatch = 10;
    
            // 线程结束标记
            final CountDownLatch dataLatch = new CountDownLatch(countLatch);
    
            // 执行指令
            final CountDownLatch isExecute = new CountDownLatch(1);
    
            // 上传的数据路径和文件名
            String path = "F:/DataTest/";
    
            for (int i = 0; i < countLatch; i++) {
    
                String file_num = "(" + (i + 1) + ")";
    
                Runnable runnable = new Runnable() {
    
                    String file_name = "data-test-" + file_num + ".zip";// data-test-(1).zip
    
                    String file_path = path + file_name;
    
                    LocalUploadData udc = new LocalUploadData();
    
                    @Override
                    public void run() {
                        System.out.println("current file_name=" + file_name);
                        System.out.println(Thread.currentThread().getName() + "准备好了...");
                        try {
                            //等待isExecute的值变为0,然后执行
                            isExecute.await();// 等待执行
    
                            System.out.println(Thread.currentThread().getName() + "开始上传>>>>>>>>");
                            long start_time = System.currentTimeMillis();
    
                            // 上传数据
                            int response_code = udc.httpPostData(file_path, file_name);
                            System.out.println("响应结果:"+response_code);
    
                            long end_time = System.currentTimeMillis();
                            long using_time = end_time - start_time;
    
                            // 最后一个文件上传目前不启用
                            System.out.println(Thread.currentThread().getName() + "用时 : " + using_time + " ms");
    
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        } finally {
                            dataLatch.countDown();
                        }
                    }
                };
                service.execute(runnable);//把线程都放进线程池里等待
            }
    
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("After 10s.....");
    
                System.out.println(Thread.currentThread().getName() + " 准备上传!");
    
                System.out.println(Thread.currentThread().getName() + " 开始上传");
    
                //isExecute的值减一,开始执行
                isExecute.countDown();
    
                //等待dataLatch的值变为0,表示线程全部执行结束。
                dataLatch.await();
    
                System.out.println(Thread.currentThread().getName() + " 上传结束.");
    
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                service.shutdown();
            }
        }
    
    /**
    * okhttp上传数据demo
    */
        public int httpPostData(String file_path, String filename) {
    
            OkHttpClient.Builder client = new OkHttpClient.Builder().connectTimeout(30, TimeUnit.SECONDS).readTimeout(80,
                    TimeUnit.SECONDS);
    
            File file = new File(file_path);
    
            RequestBody fileBody = RequestBody.create(MediaType.parse("image/*"), file);
    
            RequestBody body = new MultipartBody.Builder().setType(MultipartBody.FORM)
                    .addFormDataPart("file", file.getName(), fileBody)
                    .addFormDataPart("file_name", filename).build();
    
            Request request = new Request.Builder().url("http://192.168.1.68:8080/upload-data/l/localUploadData").post(body).build();
            Response response;
            try {
                response = client.build().newCall(request).execute();
                String jsonString = response.body().string();
                if (!response.isSuccessful()) {
                    System.out.println("NetworkException>>" + response.code() + "\n" + jsonString);
                } else {
                    System.out.println("response>>" + response.code() + "\n" + jsonString);
                    return response.code();
    
                }
            } catch (IOException e) {
                e.printStackTrace();
                return 0;
            }
    
            return 0;
    
        }
    
    }
    
    

    一个接口分别在两个地方进行测试:
    - 本地jetty-run运行
    - 部署在阿里云服务器上

    1、本地jetty-run运行的接口
    这里写图片描述
    这里写图片描述

    从上面三组测试结果能看出,10组数据响应时间<6s,这个还能在心理接受范围之内,但是20组数据的时候,最短的响应时间>6s,最长的响应时间超过了27s。

    在并发数据大的时候,线程阻塞导致没有一个线程能极速处理,这样每个处理时间都很长,我预想的响应效果,起码要达到从3s到20s之间,就是有的用户响应快,有的响应慢,根据用户的网速从而达到一个梯度,而不是让所有用户都觉得慢。

    2、部署在阿里云服务器上的接口

    部署到线上的测试更是惨不忍睹,非常非常的慢,已经超出我能容忍的范围,这是让人抓狂的慢。
    这里写图片描述
    这里写图片描述
    这里写图片描述

    从这里能看到,其实服务器写出文件速度非常快的,但是接口响应很慢,能达到85s。(╯‵皿′)╯︵┻━┻

    以上就是高并发测试上传数据的demo和测试记录。
    下一篇就是针对这么慢的响应进行接口的优化。

    展开全文
  • 针对高并发处理文件解决方案

    千次阅读 2019-02-28 21:43:05
    但如果并发高,在我们对文件进行读写操作时,很有可能多个进程对进一文件进行操作,如果这时不对文件的访问进行相应的独占,就容易造成数据丢失。 例如:一个在线聊天室(这里假定把聊天内容写入文件),在同一时刻...

    对于日IP不高或者说并发数不是很大的应用,一般不用考虑这些!用一般的文件操作方法完全没有问题。但如果并发高,在我们对文件进行读写操作时,很有可能多个进程对进一文件进行操作,如果这时不对文件的访问进行相应的独占,就容易造成数据丢失。
    例如:一个在线聊天室(这里假定把聊天内容写入文件),在同一时刻,用户A和用户B都要操作数据保存文件,首先是A打开了文件,然后更新里面的数据,但这里B也正好也打开了同一个文件,也准备更新里面的数据。当A把写好的文件保存时,这里其实B已经打开了文件。但当B再把文件保存回去时,这里已经造成了数据的丢失,因为这里B用户完全不知道它所打开的文件在它对其进行更改时,A用户也更改了这个文件,所以最后B用户保存更改时,用户A的更新就被会丢失。
    对于这样的问题,一般的解决方案时当一进程对文件进行操作时,首先对其它进行加锁,意味着这里只有该进程有权对文件进行读取,其它进程如果现在读,是完全没有问题,但如果这时有进程试图想对其进行更新,会遭到操作拒绝,先前对文件进行加锁的进程这时如果对文件的更新操作完毕,这就释放独占的标识,这时文件又恢复到了可更改的状态。接下来同理,如果那个进程在操作文件时,文件没有加锁,这时,它就可以放心大胆的对文件进行锁定,独自享用。
    一般的方案会是:

    复制代码代码如下:


    $fp=fopen('/tmp/lock.txt','w+');
    if (flock($fp,LOCK_EX)){
        fwrite($fp,"Write something here\n");
        flock($fp,LOCK_UN);
    }else{
        echo 'Couldn\'t lock the file !';
    }
    fclose($fp);


    但在PHP中,flock似乎工作的不是那么好!在多并发情况下,似乎是经常独占资源,不即时释放,或者是根本不释放,造成死锁,从而使服务器的cpu占用很高,甚至有时候会让服务器彻底死掉。好像在很多linux/unix系统中,都会有这样的情况发生。所以使用flock之前,一定要慎重考虑。
    那么就没有解决方案了吗?其实也不是这样的。如果flock()我们使用得当,完全可能解决死锁的问题。当然如果不考虑使用flock()函数,也同样会有很好的解决方案来解决我们的问题。经过我个人的搜集和总结,大致归纳了解决方案有如下几种。
    方案一:对文件进行加锁时,设置一个超时时间。大致实现如下:

    复制代码代码如下:


    if($fp=fopen($fileName,'a')){
     $startTime=microtime();
     do{
      $canWrite=flock($fp,LOCK_EX);
      if(!$canWrite){
       usleep(round(rand(0,100)*1000));
      }
     }while((!$canWrite)&&((microtime()-$startTime)<1000));
     if($canWrite){
      fwrite($fp,$dataToSave);
     }
     fclose($fp);
    }


    超时设置为1ms,如果这里时间内没有获得锁,就反复获得,直接获得到对文件操作权为止,当然。如果超时限制已到,就必需马上退出,让出锁让其它进程来进行操作。

     

    方案二:不使用flock函数,借用临时文件来解决读写冲突的问题。大致原理如下:
    (1)将需要更新的文件考虑一份到我们的临时文件目录,将文件最后修改时间保存到一个变量,并为这个临时文件取一个随机的,不容易重复的文件名。
    (2)当对这个临时文件进行更新后,再检测原文件的最后更新时间和先前所保存的时间是否一致。
    (3)如果最后一次修改时间一致,就将所修改的临时文件重命名到原文件,为了确保文件状态同步更新,所以需要清除一下文件状态。
    (4)但是,如果最后一次修改时间和先前所保存的一致,这说明在这期间,原文件已经被修改过,这时,需要把临时文件删除,然后返回false,说明文件这时有其它进程在进行操作。
    实现代码如下:

    复制代码代码如下:


    $dir_fileopen='tmp';
    function randomid(){
        return time().substr(md5(microtime()),0,rand(5,12));
    }
    function cfopen($filename,$mode){
        global $dir_fileopen;
        clearstatcache();
        do{
      $id=md5(randomid(rand(),TRUE));
            $tempfilename=$dir_fileopen.'/'.$id.md5($filename);
        } while(file_exists($tempfilename));
        if(file_exists($filename)){
            $newfile=false;
            copy($filename,$tempfilename);
        }else{
            $newfile=true;
        }
        $fp=fopen($tempfilename,$mode);
        return $fp?array($fp,$filename,$id,@filemtime($filename)):false;
    }
    function cfwrite($fp,$string){
     return fwrite($fp[0],$string);
    }
    function cfclose($fp,$debug='off'){
        global $dir_fileopen;
        $success=fclose($fp[0]);
        clearstatcache();
        $tempfilename=$dir_fileopen.'/'.$fp[2].md5($fp[1]);
        if((@filemtime($fp[1])==$fp[3])||($fp[4]==true&&!file_exists($fp[1]))||$fp[5]==true){
            rename($tempfilename,$fp[1]);
        }else{
            unlink($tempfilename);
      //说明有其它进程 在操作目标文件,当前进程被拒绝
            $success=false;
        }
        return $success;
    }
    $fp=cfopen('lock.txt','a+');
    cfwrite($fp,"welcome to beijing.\n");
    fclose($fp,'on');


    对于上面的代码所使用的函数,需要说明一下:
    (1)rename();重命名一个文件或一个目录,该函数其实更像linux里的mv。更新文件或者目录的路径或名字很方便。但当我在window测试上面代码时,如果新文件名已经存在,会给出一个notice,说当前文件已经存在。但在linux下工作的很好。
    (2)clearstatcache();清除文件的状态.php将缓存所有文件属性信息,以提供更高的性能,但有时,多进程在对文件进行删除或者更新操作时,php没来得及更新缓存里的文件属性,容易导致访问到最后更新时间不是真实的数据。所以这里需要使用该函数对已保存的缓存进行清除。

     

    方案三:对操作的文件进行随机读写,以降低并发的可能性。
    在对用户访问日志进行记录时,这种方案似乎被采用的比较多。先前需要定义一个随机空间,空间越大,并发的的可能性就越小,这里假设随机读写空间为[1-500],那么我们的日志文件的分布就为log1~到log500不等。每一次用户访问,都将数据随机写到log1~log500之间的任一文件。在同一时刻,有2个进程进行记录日志,A进程可能是更新的log32文件,而B进程呢?则此时更新的可能就为log399.要知道,如果要让B进程也操作log32,概率基本上为1/500,差不多约等于零。在需要对访问日志进行分析时,这里我们只需要先将这些日志合并,再进行分析即可。使用这种方案来记录日志的一个好处时,进程操作排队的可能性比较小,可以使进程很迅速的完成每一次操作。

    方案四:将所有要操作的进程放入一个队列中。然后专门放一个服务完成文件操作。队列中的每一个排除的进程相当于第一个具体的操作,所以第一次我们的服务只需要从队列中取得相当于具体操作事项就可以了,如果这里还有大量的文件操作进程,没关系,排到我们的队列后面即可,只要愿意排,队列的多长都没关系。

    对于以前几种方案,各有各的好处!大致可能归纳为两类:
    (1)需要排队(影响慢)比如方案一、二、四
    (2)不需要排队。(影响快)方案三
    在设计缓存系统时,一般我们不会采用方案三。因为方案三的分析程序和写入程序是不同步的,在写的时间,完全不考虑到时候分析的难度,只管写的行了。试想一下,如我们在更新一个缓存时,如果也采用随机文件读写法,那么在读缓存时似乎会增加很多流程。但采取方案一、二就完全不一样,虽然写的时间需要等待(当获取锁不成功时,会反复获取),但读文件是很方便的。添加缓存的目的就是要减少数据读取瓶颈,从而提高系统性能。
    从上为个人经验和一些资料的总结,有什么不对的地方,或者没有谈到的地方,欢迎各位同行指正。

    展开全文
  • 高并发集群式的文件传输系统

    千次阅读 2019-05-29 21:17:17
    允许上传文件名不同的相同文件,这里会在第一个数据库中添加一个新的文件名对应的项,在第二个表中该文件对应的hash 的计数会加一, 删除一个文件的时候,相应的引用计数减一,当引用计数为0的时候,删除数据库中的...

    总体设计思路

    集群是多个服务器同时去做一个事情,相比于单个的服务器,集群可同时处理更多的业务,但是集群的设计上也必须要处理好各个服务器之间的关系,首先要考虑的就是服务器的负载均衡、数据的备份、多个服务器之间的数据的同步的问题,要根据服务器实现的业务来确定服务器采用什么样的同步的方式才能使得同步的效率更高。

    在本次的项目中,采用一个负载均衡器,负责根据服务器的当前的负载的情况向客户端分发一个负载相对较小的服务器,然后负载均衡器断开与客户端的连接,客户端拿到服务器的地址后,直接与服务器连接,中间的数据不经过负载均衡器。

    这样做的原因,因为本系统是文件传输系统客户端与服务器的交互大多是上传下载,逻辑判断的部分不是主要消耗资源的部分,如果每条消息都经过负载均衡器,所有的客户端的连接都集中在负载均衡器上,这样就丧失了集群的优点。

    负载均衡器除了要负责向客户端分发服务器以外,还要处理服务器的数据同步的问题,如果有一台新上线的服务器,新的服务器要先向负载均衡器发送信息同步的请求,这时候,负载均衡器分配一个服务器向新上线的服务器发送同步的数据。

    这样每台服务器既需要于客户端进行交互,又需要与其他的服务器进行交互,如果只通过一个端口进行数据的收发,对服务器来说就需要进行的消息来源的判断。这样在服务器端的编程的难度会加大,逻辑也不容易理清,所以,这里我才用一台服务器监听两个端口的数据,一个端口负责与客户端的通信,另一个端口负责与其他的服务器的交互。着这样两个端口的信息的通信并不影响,采用并发的方式的需要也不容易出错。

    对于数据同步问题的处理,这个问题我分了两种情况

    1. 不存在新上线的服务器的情况
      这是最简单的情况,采用的方法就是当本地服务器上的数据发生改变的时候,立即向其他的服务器发送数据同步的消息,其他服务器根据消息做出相应的改变。
    2. 当有新上线的服务器正在同步数据,这时候其他的服务器上有同步的消息发过来。
      处理的方式,当新的服务器上线的时候,向负载均衡器发送一个同步的请求,负载均衡器这时候生成一个时间戳,发送给两个要进行数据同步的服务器,新上线的服务器需要同步的是该时间戳之前的数据,在该时间戳之后的消息都被视为新的同步消息二保存起来,当两个服务器之间的数据同步全部完成的时候,两台服务器开始处理新的同步请求。
      服务器数据的备份,这里没有去刻意备份服务器上的数据,而是采用多台服务器之间互为备份的方法,数据同步就是备份的过程。即能保证多台服务器之间数据的一致性,也有备份的过程。

    服务器端的设计

    在同步的方式下,当我们发送给一个请求的时候,希望得到对方的确认,如果此刻我们阻塞等待消息的到来,而一个线程同时处理着多个连接,此时其他的请求我们也就不能去执行了,这对于其他的客户端是不公平的,而如果我们此刻不再等待去处理其他的请求,当该确认消息到达的时候,我们怎样确定用什么样的方式的去处理该确认消息呢?

    因为每个连接处理是串行的,例如:当一个客户端在上传文件的时候,不能进行其他的操作,只能等待,这样我们可以给每个连接一个状态,当接收到up 的命令后,我们知道接下来的数据就是文件的内容,这时候,服务器接收到一个up 请求之后,就将该链接的状态改为RECVINF_STATUS ,在接收的消息之后,判断是这个状态,就将接收到的消息写到文件中。
    这样对于我们可以给每一个连接设定几个状态,以标志下一条消息的处理方式
    服务器端的几个状态

    #define RECV_STATUS 1  //正在接收文件
    #define CMD_STATUS 2   //客户端发送过来的消息作为命令处理
    #define WAIT_AFFIRM 3	//等待客户端的确认
    #define SEND_STATUS 4	//正在发送文件
    

    客户端的几个状态:

    #define DOWN_AFFIRM_STATUS 9  //下载文件时等待对方的确认状态
    #define UP_AFFIRM_STATUS 5   //上传文件的时候等待对方的确认状态
    #define RECVING_STATUS 6	//正在接收文件的状态
    #define SENDING_STATUS 7   //正在发送文件的状态
    #define CMD_STATUS 8	   //命令状态,此刻从键盘终端获取消息的状态
    

    多并发的方式:采用muduo的reactor模式,对于一个客户端的处理只能是一个线程,不能出现多个线程同时处理一个客户端请求的情况。

    新上线的服务器要向负载均衡服务器发送给自己的ip地址和端口号,负载均衡服务器接收到信号之后,要通知一个在线的服务器,向新上线的服务器发送同步的消息。

    服务器端保存的是文件都是以文件的hash 值命名,文件名存储在数据库中,与相应的hash 对应,其结构为:
    在这里插入图片描述
    在这里插入图片描述
    第一个表中存储文件名, hash , flag 表示该文件是否完整,
    flag的作用是,如果文件不完整,当客户端再次上传的时候,先判断文件名,再判断hash值,如果都相同的话,从断点的位置开始上传,不需要从头在重新上传
    hash 的作用是,当上传的文件在本地已经存在的时候,不需要重复上传,这样就节省了上传的时间。
    因为当接受到新的文件的时候,当前的服务器需要向其他的服务器发送同步数据的消息,如果上传的消息本地已经存在,就不用在向其他的服务器来发送同步的消息了,这样对其他的服务器也是一种节省开销的方式,对用户来说,提高了上传的速度,也是一种很好的用户体验,
    第二个表,hash 值相同的文件的引用计数。允许上传文件名不同的相同文件,这里会在第一个数据库中添加一个新的文件名对应的项,在第二个表中该文件对应的hash 的计数会加一, 删除一个文件的时候,相应的引用计数减一,当引用计数为0的时候,删除数据库中的记录,并且删除本地文件。

    负载均衡器的设计

    负载均衡服务器处在客户端与服务器的中间,起始就在一开始的时候在两者之间,之后当客户端与服务器连接的时候,负载均衡器与客户端的连接就断开了,所以负载均衡服务器的开销还是很小的,因为这种客户端与服务器的连接属于长连接,不会出现客户端频繁请求负载均衡服务器的情况。

    客户端的设计

    客户端同样采用了非阻塞IO 的方式,尽管这里我觉得是没有必要的,因为在设计的最初,就确定了操作是串行的,当上传文件的时候,不能其他的操作。在具体的实现的过程中其实是可以的,客户端这里采用了异步的方式,当没有消息的时候,会阻塞在键盘的输入端,获取下一条命令。同样,在客户端并没有在指定的位置去等待消息的到来,这样,当消息到来的时候,怎让判断消息的类型,用谁来处理消息,成了问题。
    这里采用了与服务器同样的方式,设置当前客户端的状态。以不同的状态处理不同的消息的类型,之所以能够这样做,就是因为tcp通信的特点,消息到来的顺序并不会改变,所以不管有多小跳消息到来,我们都能确定每条消息在缓冲区中的位置,从而取出指定的消息进行过操作。
    总体的框架就是这样,具体实现可以根据不同的功能进行填充。

    总结

    对异步IO 消息的处理
    对集群的整体的设计思想
    以上的设计并不都是在最初的时候就设计出的,很对都是在实现的时候,根据出现的bug 修改出得出的。所以编程的实践很重要。

    代码实现

    https://github.com/ckl666/file-system

    展开全文
  • 本人整理的文件上传,性能,支持多个,用到了JQuery中的插件,比较方便,易懂
  • 最近接了某需求,是需要把文字转换成语音MP3文件,存入fastdfs,然后把文件路径存入数据库。 我们fastdfs是5.05版本,fastdfs工具类已经改成支持并发了,每次获取新的服务端对象 public class FastClient<main...

    最近接了某需求,是需要把文字转换成语音MP3文件,存入fastdfs,然后把文件路径存入数据库。

    我们fastdfs是5.05版本,fastdfs工具类已经改成支持并发

    public class FastClient<main> {
    
    
        private static Logger logger = Logger.getLogger(FastClient.class);
    
        /**
         * 只加载一次.
         */
        static {
            try {
                ClientGlobal.init("fdfs_client.properties");
                TrackerClient trackerClient = new TrackerClient(ClientGlobal.getGtrackerGroup());
                TrackerServer trackerServer = trackerClient.getConnection();
                if (trackerServer == null) {
                    logger.error("getConnection return null");
                }
                StorageServer storageServer = trackerClient.getStoreStorage(trackerServer);
                if (storageServer == null) {
                    logger.error("getStoreStorage return null");
                }
                // 这里有坑,具体参考文章:http://www.ityouknow.com//fastdfs/2017/12/26/fastdfs-concurrent.html
                // 根本原因:storageServer在高并发使用时有可能被置空,storageClient1使用时报NPE
                // 解决办法:放弃共享 storageClient1方式,每次调用时自己new出来
                // 所以注掉
    //			storageClient1 = new StorageClientExtend(trackerServer, storageServer);
    
            } catch (Exception e) {
                logger.error(e);
            }
        }
    
    
        /**
         * description 获取StorageClient 实例,并发可用
         * param
         * return 
         * author 
         * createTime 2019/4/27 13:12
         **/
        private static StorageClientExtend getStorageClientExtend() throws IOException {
            TrackerClient trackerClient = new TrackerClient(ClientGlobal.getGtrackerGroup());
            TrackerServer trackerServer = trackerClient.getConnection();
            if (trackerServer == null) {
                logger.error("getConnection return null");
            }
            logger.info( "获取socket连接===》"+trackerServer.getSocket() );
            //给dfs发送一个消息
            ProtoCommon.activeTest(trackerServer.getSocket());
    
            StorageServer storageServer = trackerClient.getStoreStorage(trackerServer);
            return  new StorageClientExtend(trackerServer, storageServer);
        }
    
    
      /**
         *
         * @param fis
         *            文件
         * @param fileName
         *            文件名
         * @return 返回Null则为失败
         */
        public static synchronized String uploadFile(InputStream fis, String fileName) {
            try {
    
                NameValuePair[] metaList = null;
                byte[] fileBuff = null;
                if (fis != null) {
                    int len = fis.available();
                    fileBuff = new byte[len];
                    int i = fis.read(fileBuff);
                }
    
                StorageClientExtend storageClientExtend = getStorageClientExtend();
                String uploadFilepath = storageClientExtend.uploadFile1(fileBuff, getFileExt(fileName), metaList);
                logger.info("uploadFile 文件名称 uploadFilepath : " + uploadFilepath);
                return uploadFilepath;
            } catch (Exception ex) {
                logger.warn("uploadFile上传文件异常 Exception:{}",ex);
                logger.error(ex);
                return null;
            }finally{
                if (fis != null){
                    try {
                        fis.close();
                    } catch (IOException e) {
                        logger.warn("uploadFile IOException 上传文件异常 , Exception:{}",e);
                        logger.error(e);
                    }
                }
            }
        }
    
    
    

     

    贴一下业务代码大家看看

    
        public void generateVoice(MessageNoticeDTO messageNoticeDTO, Integer type)throws NoSuchAlgorithmException, InvalidKeyException, MalformedURLException {
            Long messageNoticeId = messageNoticeDTO.getId();
            String content = messageNoticeDTO.getContent();
            Long messageNoticeCreateTime = messageNoticeDTO.getCreateTime();
            OkHttpClient client = new OkHttpClient.Builder()
                    //超时时间
                    .connectTimeout(60, TimeUnit.SECONDS)
                    //读取时间
                    .readTimeout(60, TimeUnit.SECONDS)
                    .build();
    
            //科大讯飞构建鉴权
            Request request = authentication();
            // 开启webSocket
            client.newWebSocket(request, new WebSocketListener() {
    
                //调用科大接口
                @Override
                public void onOpen(WebSocket webSocket, Response response) {
                    super.onOpen(webSocket, response);
                    //发送数据
                    JSONObject frame = organizationRequestData(content);
                    webSocket.send(frame.toString());
                }
    
                //获取语音流
                @SneakyThrows
                @Override
                public void onMessage(WebSocket webSocket, String text) {
                    super.onMessage(webSocket, text);
                    JSONObject object = JSONObject.parseObject(text);
                    //处理返回数据
                    ResponseData resp = JSONObject.toJavaObject(object, ResponseData.class);
    
                    if (null == resp || resp.getCode() != 0 || resp.getData() == null) {
                        log.error("获取音频数据异常,向重试队列发送消息,快讯id:{},响应:{}", messageNoticeId, resp);
                        throw new NullPointerException("获取音频数据异常");
                    }
    
                    //resp.data.status ==2 说明数据全部返回完毕,可以关闭连接,释放资源
                    if (resp.getData().getStatus() == 2) {
                        //上传到fastfds
                        String uploadFilepath = uploadFile(resp);
                        if (StringUtils.isBlank(uploadFilepath)) {
                            log.error("上传到文件服务器异常,向重试队列发送消息,快讯id:{}", messageNoticeId);
                            throw new NullPointerException("上传到文件服务器异常 获取音频数据url异常");
                        }
                        log.info("快讯语音数据url生成,id:{},voiceUrl:{}", messageNoticeId, uploadFilepath);
                        //更新快讯数据信息
                        modifyMessageNotice(messageNoticeDTO, uploadFilepath);
                        log.info("快讯语音数据更新完成,id:{},voiceUrl:{}", messageNoticeId, uploadFilepath);
                        webSocket.close(1000, "获取音频完成");
    
                    }
                }
    
                //websocker异常回调
                @Override
                public void onFailure(WebSocket webSocket, Throwable t, Response response) {
                    super.onFailure(webSocket, t, response);
                    webSocket.close(1000, "获取音频完成");
                    //异常情况处理
                    log.error("科大异常,快讯id入参:{},Throwable:{},Response:{}", messageNoticeId, t, response);
                    voiceMessageNoticeAo.handleInfoMessageVoiceFail(messageNoticeId, messageNoticeCreateTime, type);
                }
            });
    
        }
    
        /**
         * @param resp:
         * @Description:
         * @Author:
         * @Date: 2021/3/25 11:11
         * @return: java.lang.String
         **/
        private String uploadFile(ResponseData resp) {
            String result = resp.getData().getAudio();
            byte[] audio = Base64.getDecoder().decode(result);
            //上传到fastfds
            return FastClient.uploadFile(new ByteArrayInputStream(audio), ".mp3");
        }
    
     /**
         * @param messageNoticeDTO: 快讯dto对象
         * @Description: 更新短讯表文件路径
         * @Author:
         * @Date: 2021/3/12 11:12
         * @return: void
         **/
        private void modifyMessageNotice(MessageNoticeDTO messageNoticeDTO, String uploadFile) throws MessageNoticeException {
            messageNoticeDTO.setVoiceUrl(FAST_DFS_DOMAIN + uploadFile);
            messageNoticeService.modifyMessageNotice(messageNoticeDTO);
        }

    这是获取科大讯飞然后转码上传fastdfs代码,一开始在测试的时候并发达到200,500左右都是很正常的。但是到了线上环境,峰值达到每秒几百个,就受不了,在上传的时候出现各种异常

    java.net,SocketException:Connection closed by remote host,Response:null

    java.net.SocketException:timeout,Response:null

    java.io.EOFException,Response:null

    javax.net.ssl.SSLException:SSL peer shut down incorrectly,Response:null

    java.io.IOException:recv package size -1 ! = 10

     

    会抛出各种上传异常或者sockertExecption,但是这些异常都是偶发性的。比如上传1000次,可能有十几次失败,也可能百十次的失败。

    刚开始以为是配置的通讯时间问题,修改了参数,改的大了一点,发现确实管点事,但是不能从根本解决问题。

    #连接超时时间,针对socket套接字函数connect
    connect_timeout = 300
    #网络通讯超时时间,默认是60秒
    network_timeout = 600

    从网上查了一下,都说是客户端和服务端连接断开了,需要重试。可以服务端主动给客户端发送消息,保持连接

            ProtoCommon.activeTest(trackerServer.getSocket());

    在测试发现后,加这个也不管事,此时已经有点束手无策,肝源码看看吧

    在  ProtoCommon.activeTest(trackerServer.getSocket());方法中,会发送一个空包给服务端,让服务端不断开连接,但是这个请求发出去就抛异常了。

    public static RecvHeaderInfo recvHeader(InputStream in, byte expect_cmd, long expect_body_len) throws IOException
    	{
    		byte[] header;
    		int bytes;
    		long pkg_len;
    		
    		header = new byte[FDFS_PROTO_PKG_LEN_SIZE + 2];
    		
    		if ((bytes=in.read(header)) != header.length)
    		{
    			throw new IOException("recv package size " + bytes + " != " + header.length);
    		}
    		
    		if (header[PROTO_HEADER_CMD_INDEX] != expect_cmd)
    		{
    			throw new IOException("recv cmd: " + header[PROTO_HEADER_CMD_INDEX] + " is not correct, expect cmd: " + expect_cmd);
    		}
    		
    		if (header[PROTO_HEADER_STATUS_INDEX] != 0)
    		{
    			return new RecvHeaderInfo(header[PROTO_HEADER_STATUS_INDEX], 0);
    		}
    		
    		pkg_len = ProtoCommon.buff2long(header, 0);
    		if (pkg_len < 0)
    		{
    			throw new IOException("recv body length: " + pkg_len + " < 0!");
    		}
    		
    		if (expect_body_len >= 0 && pkg_len != expect_body_len)
    		{
    			throw new IOException("recv body length: " + pkg_len + " is not correct, expect length: " + expect_body_len);
    		}
    		
    		return new RecvHeaderInfo((byte)0, pkg_len);
    	}
    

    在上传的代码里也会调用这个方法,一上传包就挂了

    解放方案:

    高并发情况下扩大fastdfs最大连接数,根据服务器情况酌情扩大。

    fastdfs5.05版本最大连接数256,科大讯飞语音接口,异步方法接收响应,然后在直接处理上传,相当于来了多少请求就会开多少连接,很快就会打满最大连接数,造成后面的上传异常,只有前面的上传完成后,放开了才能获取到服务端的连接,但是异步处理方法不是获取不到就阻塞住的,他是直接new了去上传,然后服务端没有连接线程来处理这个,就会返回一个空的路径或者是连接异常这种错误。

     

     

     

     

     

     

     

    展开全文
  • C语言编写高并发Http文件上传下载服务器前言项目效果图项目介绍环境介绍程序结构之:event相关程序结构之:http相关第一种 获取文件列表类 POST请求第二种 获取文件列表类 GET请求第三种 获取文件内容类 GET请求源码...
  • 高并发复用数据库链接技术详解之数据库连接池 类加载器的高级特性(自定义类加器实现加密解密) iBATIS开源主流框架(实现半自动化hibernate) 企业实用技能之详解(眼睛横纹模式验证码防止恶意登陆) 动态页面的静态化...
  • C#调用httplistener实现简单的http服务器例子:编译后是一个控制台应用程序,启动后,可通过 http://127.0.0.1/ 访问,采用了回调模式提供http服务,支持高并发
  • LIBEVENT:是一款事件驱动的网络开发包,由于采用C语言开发...讲解HTTP服务端开发示例,HTTP客户端请求开发示例,最后基于 LIBEVENTT创建线程池C++框架,并用此框架完成FTP服务器的登录、目录访问、文件上传下载能功
  • IIS网站高性能高并发优化解决方案,通过修改IIS配置结合window定时任务优化IIS下网站的高并发访问性能
  • 服务器的登录、目录访问、文件上传下载能功能。 开发环境 操作系统: windows1064位和 ubuntu18.0464位 Libevent版本:2.1.8 Windows开发工具:VS2017社区版 ubuntu开发工具:g++make 开发语言:C/C++ 课程...
  • 服务器在Linux平台上编译,采用epoll高并发框架,可以高效,大并发的文件上传下载服务。客户端可在windows平台上用vs编译,编译出来是个dll。windows 可以直接调用此dll,实现文件的上传,下载。
  • NetCore文件上传两种方式  NetCore官方给出的两种文件上传方式分别为“缓冲”、“流式”。我简单的说说两种的区别, ... 文件上传所用的资源(磁盘、内存)取决于并发文件上传的数量和大小。 如果应用尝试
  • 高并发架构解析

    2021-02-24 00:48:35
    高并发经常会发生在有大活跃用户量,用户高聚集的业务场景...一个可以支持高并发的服务少不了好的服务器架构,需要有均衡负载,数据库需要主从集群,nosql缓存需要主从集群,静态文件需要上传cdn,这些都是能让业务程序
  • 通过腾讯云云函数 SCF 可以实现对象存储 COS 中的文件自动解压缩、其中 COS 用于存储上传后需要解压的 .zip 文件及解压后的文件,SCF 实现对 .zip 文件上传至 COS 后的自动解压缩。同时还适用于对相关音视频文件上传...
  • java ftp上传文件 支持并发

    千次阅读 2018-11-29 10:21:42
    * @param originfilename 待上传文件的名称(绝对地址) * * @return */ public static boolean uploadFile( String pathname, String fileName,String originfilename){ boolean flag = false; InputStream...
  • 高并发经常会发生在有大活跃用户量,用户高聚集的业务场景中,如:...一个可以支持高并发的服务少不了好的服务器架构,需要有均衡负载,数据库需要主从集群,nosql缓存需要主从集群,静态文件需要上传cdn,这些都是能
  • 用SocketAsyncEventArgs通讯封装、服务端实现日志查看、SCOKET列表、上传、下载、远程文件流、吞吐量协议,测试SocketAsyncEventArgs的性能和压力,最大连接数支持65535个长连接,最高命令交互速度达到250MB/S(使用...
  • 微服务Springboot实战大揭秘/高并发/高可用/高负载/互联网技术—320人已学习 课程介绍 Java架构师系列课程是针对有志向架构师发展的广大学员而设置,不管你是工作一到三年,还是三到五年,本课程都会对你产生巨大的...
  • 在SpringBoot项目中通常我们没有处理并发问题,但是使用项目本身还是支持一定的并发量,因为在SpringBoot中内嵌Tomcat容器,而Tomcat在spring-configuration-metadata.json文件中设置了关于并发的默认配置: ...
  • 文件上传 文件上传是Web开发非常重要的操作 结合SpringBoot实现文件上传操作 1、基础上传 SpringBoot采用FileUpload组件实现上传处理,在控制器中可以使用MultipartFile类进行接收。 FileUpload上传 MultipartFile...
  • 上传文件时生成 优点:上传时就已经生成需要的缩略图,读取时不需要再判断,减少cpu运算。 缺点:当缩略图尺寸变化时或新增尺寸时,需要重新生成所有的缩略图。 2.访问时生成 优点:1.当有用户访问时才需要生成,...
  • 现在的视频、游戏等大文件网站的高并发问题越来越突出,如何能在高并发下既节省带宽又能提高速度呢?这就不得不说基于bt内核的p2p技术,该技术支持各种大文件高并发,游戏、视频效果尤其突出,同时下载的人数越多...
  • 例子主要包括IOCP通讯封装、服务端实现日志查看、SCOKET列表、上传、下载、远程文件流、吞吐量协议,用于测试IOCP的性能和压力,最大连接数支持65535个长连接,最高命令交互速度达到250MB/S(使用的是127.0.0.1的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 64,667
精华内容 25,866
关键字:

高并发文件上传