精华内容
下载资源
问答
  • libcurl实现异步
    千次阅读
    2020-03-21 09:31:43

    1、multi异步接口实现文件上传demo

    《1》、设置easy接口

    #include <curl/curl.h>
    #include <curl/multi.h>
    
    /* mutil 句柄的最大连接数 */
    #define  MAX_CONNECT_HANDLE_NUMS  (15)
    
    /* 客制化的私有数据结构体 */
    typedef struct CustomPrivData
    {
        curl_mime *mime;
        struct curl_slist *headers;
        FILE* wfd;
    }CustomPrivData;
    
       curl_global_init(CURL_GLOBAL_ALL);
        
        CURLM * m_MultiHandle  = curl_multi_init();
        curl_multi_setopt(m_MultiHandle, CURLMOPT_MAXCONNECTS, MAX_CONNECT_HANDLE_NUMS);
    
        CURLSH* m_ShareHandle = curl_share_init();
        if(m_ShareHandle)
        {
            /* 设置DNS共享,不然每个easyhandle执行时都要解析一遍DNS */
            curl_share_setopt(m_ShareHandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
        }
    
    CURL*   EasyCurlHandle = curl_easy_init();
    
        if(EasyCurlHandle)
        {
            curl_easy_setopt(EasyCurlHandle, CURLOPT_FOLLOWLOCATION, 1L);
    
            struct curl_slist *headers = NULL;
            char Auth_header[256]={0};
            snprintf(Auth_header, 256, "Authorization:Bearer %s","test21334242");
            headers = curl_slist_append(headers, Auth_header);
    
            CustomPrivData* PrivData = new CustomPrivData;
            memset(PrivData, 0, sizeof(CustomPrivData));
    
            //headers = curl_slist_append(headers, "User-agent:Mozilla/5.0(Windows NT 6.1;Win64; x64)");
            //headers = curl_slist_append(headers, "Accept-Encoding:gzip,deflate");
            //headers = curl_slist_append(headers, "Accept-Language:zh-CN,zh;q=0.9");
    
            curl_easy_setopt(EasyCurlHandle, CURLOPT_HTTPHEADER, headers);
            PrivData->headers = headers;
    
            /* 设置DNS cache的超时时间为120s */
            curl_easy_setopt(EasyCurlHandle, CURLOPT_DNS_CACHE_TIMEOUT, 60*2);
            curl_easy_setopt(EasyCurlHandle, CURLOPT_SHARE, m_ShareHandle);
    
            curl_easy_setopt(EasyCurlHandle, CURLOPT_URL, url);         /* URL地址设置 */
            curl_easy_setopt(EasyCurlHandle, CURLOPT_NOSIGNAL, 1L);
    
            //curl_easy_setopt(m_EasyCurlHandle, CURLOPT_WRITEFUNCTION, NULL);
            //curl_easy_setopt(m_EasyCurlHandle, CURLOPT_WRITEDATA, NULL);
            curl_easy_setopt(EasyCurlHandle, CURLOPT_VERBOSE, 1);
    
            curl_easy_setopt(EasyCurlHandle, CURLOPT_CUSTOMREQUEST, "POST");
            curl_easy_setopt(EasyCurlHandle, CURLOPT_FOLLOWLOCATION, 1L);
            curl_easy_setopt(EasyCurlHandle, CURLOPT_DEFAULT_PROTOCOL, "https");
            
            curl_mime *mime;
            curl_mimepart *part;
            mime = curl_mime_init(EasyCurlHandle);
            part = curl_mime_addpart(mime);
            curl_mime_name(part, "file");
            curl_mime_filedata(part, file_path);
            curl_easy_setopt(EasyCurlHandle, CURLOPT_MIMEPOST, mime);
    
            PrivData->mime = mime;
    		/* 设置私有数据用于curl_multi_perform执行后区分不同easy句柄 */
            curl_easy_setopt(EasyCurlHandle, CURLOPT_PRIVATE, PrivData);
    
           /* 添加到multi句柄 */
            curl_multi_add_handle(m_MultiHandle, EasyCurlHandle);
    
            /* headers、mime不能在此处释放 ,否则调用multi接口时奔溃*/
            //curl_slist_free_all(headers);
            //curl_mime_free(mime);
        }
    
    

    以上有两点注意:
    1)、使用share接口实现DNS共享,加快处理速度;
    2)、使用curl_easy_setopt(EasyCurlHandle, CURLOPT_PRIVATE, PrivData)设置私有数据,用于curl_multi_info_read调用之后区分不同的easyhandle;

    《2》、开启线程处理函数

    void* MultiperformFunc(void* ptr)
    {
        m_IsStillRuning = true;
        int msgs_left = 0;
        int still_running = 0;
    
        while(m_IsStillRuning)
        {
            CURLMcode mcode = curl_multi_perform(m_MultiHandle, &still_running);
            while(mcode == CURLM_CALL_MULTI_PERFORM)
            {
                cout << "................ CURLM_CALL_MULTI_PERFORM ..............";
                mcode = curl_multi_perform(m_MultiHandle, &still_running);
            }
            //LOG_DEBUG << "still_running: " << still_running;
    
            CURLMsg *msg = NULL;
            int numfds = 0;
            /* wait for activity, timeout or "nothing" */
            int res = curl_multi_wait(m_MultiHandle, NULL, 0, 1000, &numfds);
            
            while((msg = curl_multi_info_read(m_MultiHandle, &msgs_left)))
            {
                if(msg->msg == CURLMSG_DONE)
                {
                    CustomPrivData* PrivData = NULL;
                    CURL *e = msg->easy_handle;
                    curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &PrivData);
                    //fprintf(stderr, "R: %d - %s <%s>\n",msg->data.result, curl_easy_strerror(msg->data.result), wfd);
                    if(PrivData)
                    {
                    	/* 处理每个easyhandle的私有数据 */
                        if(PrivData->wfd)
                            fclose(PrivData->wfd);
                        if(PrivData->headers)
                            curl_slist_free_all(PrivData->headers);
                        if(PrivData->mime)
                            curl_mime_free(PrivData->mime);
    
                        delete PrivData;
                        PrivData = NULL;
                    }
                    
                    /*当一个easy handler传输完成,此easy handler仍然仍然停留在multi stack中,
                    调用curl_multi_remove_handle将其从multi stack中移除,然后调用curl_easy_cleanup将其关闭*/
                    curl_multi_remove_handle(m_MultiHandle, e);
                    curl_easy_cleanup(e);
                }
                else
                {
                    //fprintf(stderr, "E: CURLMsg (%d)\n", msg->msg);
                    cout << "curl Error: " << msg->msg;
                }
            }
    
            usleep(30*1000);
        }
    }
    

    2、参考

    《1》、libcurl采用curl_multi_perform() + curl_multi_wait()方式实现异步高性能l发送数据的方法
    《2》、libcurl的share interface与curl_easy_perform的性能

    更多相关内容
  • 最近在工作中用到了libcurl请求大量网页,感觉使用多线程的方式线程数太高的话会影响性能,然后就写了一个简单的基于libcurl和reactor模型的框架,实现单线程高并发,不需要考虑竞争条件的问题,提高性能的同时也能...
  • 这个小程序只是简单的使用线程池实现libcurl异步请求功能,很多地方还需要优化,之所有厚颜上传到这里,主要有有三个目的。 一个是回报这些天在CSDN上得到的帮助,希望可以帮助到后来的人; 二是感于libcurl编译...
  • 用于文件上传下载和发送http get,post命令.libcurl不是一个简单的api,是一组api实现的模块,有自己的使用steps. 默认curl是get url 网页,与callback write function连动,写入文件。 2) 两种方案: curl_multi_socket...

    目录

    1 背景知识
    2 libcurl 基础知识
    3  libcurl两种模式
    4 libcurl实例分析

    正文
    1 背景知识:

    1.1 基本网络通信cs模式,select 框架,网上例子很多.下面只介绍epoll的难点.其他内容请自行搜索.

    1.2 epoll 用法

    1.2.1  基础知识:

    1) epoll in/out 与socket io 缓冲区关系?
    socket 可读可写是指io 缓冲区的情况,这层由内核控制.socket io 对应的epoll in/out 是应用层.
    epoll 通常采用ET模型.ET触发方式是指当fd到状态发生变化时通知,比如:read buffer从无到有,write buffer从满到不满才会通知.
    所以用while 不停的读,read完缓冲区.下次来数据的时候,缓冲区又是由无到有,又会触发epoll in.
    while 不停的写直到返回缓冲区满返回eagain,然后os立刻会发送,“发送缓冲区的”数据.
    “发送缓冲区的”由满变成不满,再次触发epoll out.进入epoll out分支,开始下一轮while写.
    如果没写满,但是也写完了,不会再次进入epoll out分支. 

    2) epoll out的使用时机?

    在自己端准备write之前,通过epoll ctrl设置成epoll out. 
    epoll in 是被动监听接收,epoll out是主动

    展开全文
  • 最近在工作中用到了libcurl请求大量网页,感觉使用多线程的方式线程数太高的话会影响性能,然后就写了一个简单的基于libcurl和reactor模型的框架,实现单线程高并发,不需要考虑竞争条件的问题,提高性能的同时也能...
  • 东京卷曲使用libcurl支持的Future的异步HTTP客户端的实现。用法首先,将其添加到您的Cargo.toml : [ dependencies ]tokio-curl = " 0.1 " 接下来,将其添加到您的板条箱中: extern crate tokio_curl;执照该项目...
  • 最近在工作中用到了libcurl请求大量网页,感觉使用多线程的方式线程数太高的话会影响性能,然后就写了一个简单的基于libcurl和reactor模型的框架,实现单线程高并发,不需要考虑竞争条件的问题,提高性能的同时也能...
  • 基于libcurl实现http post支持并发,异步 方便新手学习
  • 使用libcurl异步发送http请求

    千次阅读 2017-10-09 21:02:46
    在工作中需要完成一个工具,该工具主要的用途就是向指定的服务器和端口发送http请求,为了提高性能,采用多线程的方式进行,同时采用libcurl异步形式。代码如下,在其中添加一些注释来记录写代码中遇到的问题。#...

    在工作中需要完成一个工具,该工具主要的用途就是向指定的服务器和端口发送http请求,为了提高性能,采用多线程的方式进行,同时采用libcurl的异步形式。代码如下,在其中添加一些注释来记录写代码中遇到的问题。

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include <stdarg.h>
    #include <pthread.h>
    #include <signal.h>
    
    #include <curl/curl.h>
    
    #include <iostream>
    #include <sstream>
    #include <string>
    
    using namespace std;
    
    char globalQueryContext[100000][2048];//不要在函数中声明如此大的变量,否则容易在进入函数时发生core
    string globalLogFile     = "access.log";
    string globalHost        = "i3114.se.shyc2.qihoo.net";
    string globalPort        = "6351";
    string globalQueryFile   = "query.txt";
    volatile bool globalStop = false;
    static int  globalThreadNumber  = 3;
    
    void sig_handle(int)
    {
        globalStop = true;
    }
    
    struct thread_arg
    {
        int  sequence;
        int  successRequest;
        int  totalQueryNumber;
        thread_arg():sequence(-1), successRequest(0), totalQueryNumber(0){}
    };
    
    size_t write_response(void *contents, size_t size, size_t nmemb, void *stream )
    {
        string data((const char*) contents, (size_t) size * nmemb);
        *((stringstream*) stream) << data << endl;
        //cout << stream << endl;
        return size * nmemb;
    }
    
    char *getCompletedQuery(char* const completedQuery, const char *queryContext)
    {
        if(completedQuery == NULL || queryContext == NULL)
            return NULL;
    
        completedQuery[0] = '\0';//注意初始化数组
        strcat(completedQuery, "http://");
        strcat(completedQuery, globalHost.c_str());//注意string 与 char*的区别以及相互转化
        strcat(completedQuery, ":");
        strcat(completedQuery, globalPort.c_str());
        strcat(completedQuery, "/mod_qsrch/warmup?kw=");
        strcat(completedQuery, queryContext);
    
        cout <<  string(completedQuery) << ":" << strlen(completedQuery) << endl;
        return completedQuery;
    }
    
    void setCurlEasy(CURL *curl, const char *completedQuery, stringstream& response)
    {
        curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L); 
        curl_easy_setopt(curl, CURLOPT_URL, completedQuery);
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_response);
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response);
    
        return;
    }
    
    bool dealWithCurlCode(CURLcode code)
    {
        if(code == CURLE_OK)
        {
            cout << "OK" << endl;
            return true;
        }
        else 
        {
            cout << string(curl_easy_strerror(code)) << endl;
            return false;
        }
    }
    
    bool asyncSetCurlEasy(const char queryContext[][2048], 
                          char completedQuerys[][2048],
                          stringstream responses[], 
                          int queryBeginPosition,
                          size_t queryNumber,
                          CURLM *curlm
                          )
    {
        if(completedQuerys == NULL || responses == NULL || curlm == NULL)
            return false;
    
        for(size_t i = 0; !globalStop &&i < queryNumber; ++i)
        {
            CURL *curl = curl_easy_init(); 
            if(curl)
            {
                getCompletedQuery(completedQuerys[i], queryContext[queryBeginPosition + i]);        
                setCurlEasy(curl, completedQuerys[i], responses[i]);         
                curl_multi_add_handle(curlm, curl); 
            }
            else
                return false;
        }
    
        return true;
    }
    
    int asyncDealWithCurlCode(CURLM *curlm)
    {
        if(curlm == NULL) 
            return false;
    
        int leftMsg = 0;
        int sucessCurl = 0;
        CURLMsg* msg = NULL;
    
        while(!globalStop && (msg = curl_multi_info_read(curlm, &leftMsg)) != NULL)
        {
            if(msg->msg == CURLMSG_DONE) 
            {
                sucessCurl++;
                int httpStatusCode = 0; 
                curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &httpStatusCode);
                char *effectiveUrl = NULL;
                curl_easy_getinfo(msg->easy_handle, CURLINFO_EFFECTIVE_URL, &effectiveUrl); 
                cout << "url: " << effectiveUrl << " status:  " << httpStatusCode << "  " 
                    << curl_easy_strerror(msg->data.result)  << endl;         
                curl_multi_remove_handle(curlm, msg->easy_handle);
                curl_easy_cleanup(msg->easy_handle);
            }
            else  
                return sucessCurl;
        }
    
        return sucessCurl;
    }
    
    int asyncSendRequestAndGetResponse(const char queryContexts[][2048], 
                                       int queryBeginPosition, 
                                       size_t queryNumber)
    {
        char completedQuerys[queryNumber][2048]; 
        stringstream responses[queryNumber]; 
        CURLM *curlm = curl_multi_init();
        //TODO 对curlm进行判断
        if(!asyncSetCurlEasy(queryContexts, completedQuerys, responses, queryBeginPosition, queryNumber, curlm))
        {
            cout << "asyncSetCurlEasy error"<< endl; 
            return 0;
        }
    
        int runningCurls = 0;
        do{
            curl_multi_wait(curlm, NULL, 0, 2000, NULL);  
            curl_multi_perform(curlm, &runningCurls);
        }while(runningCurls > 0 && !globalStop);
    
        int sucessRequest = asyncDealWithCurlCode(curlm);  
        curl_multi_cleanup(curlm); 
    
        return sucessRequest;
    }
    
    bool sendRequestAndGetResponse(const char* queryContext)
    {
        CURL *curl = curl_easy_init();
    
        if(curl)
        {
            char completedQuery[2048] = {0};
            stringstream response;
            getCompletedQuery(completedQuery, queryContext);        
            setCurlEasy(curl, completedQuery, response); 
    
            CURLcode code = curl_easy_perform(curl);
            curl_easy_cleanup(curl);
            return dealWithCurlCode(code);
        }
        else
            return false;
    }
    
    int getQueryContext(const string& path, char queryContexts[][2048], int queryNumber)
    {
        FILE* fd = fopen(path.c_str(), "r");
        if(!fd)
        {
            cout << "open file " << path << " failed!"<< endl;
            return 0;
        }
    
        int index = 0;
        //queryContexts[index] = {};
        while(!globalStop && index < queryNumber && fgets(queryContexts[index], 2048, fd) != NULL)
        {
            queryContexts[index][strlen(queryContexts[index]) - 1] = '\0';
            cout << "query" << index << ":" << queryContexts[index] << endl; 
            //queryContexts[++index] = "";
            index++;
        }
    
        fclose(fd);
        return index;
    }
    
    void *doWarmBySendQueryFormFile(void *arg)
    {
        struct thread_arg *queryMsg = (thread_arg*)arg; 
        int averageQueryNumber = queryMsg->totalQueryNumber / globalThreadNumber;
        int queryBeginPosition = queryMsg->sequence * averageQueryNumber;
        if(queryMsg->sequence == globalThreadNumber - 1) 
            averageQueryNumber = queryMsg->totalQueryNumber - averageQueryNumber * queryMsg->sequence;
        cout << "thread " << queryMsg->sequence << " query begin position is " << queryBeginPosition << " query number is " << averageQueryNumber << endl;
        queryMsg->successRequest = asyncSendRequestAndGetResponse(globalQueryContext, queryBeginPosition, averageQueryNumber);
    
        return NULL;
    }
    
    void usage(const char* pname)
    {
        cout << pname << 
               "-p port "
               "-h host "
               "-q input_file "
               "-l log_file "
               "-t thread_number" << endl;
    }
    
    void *time_worker(void *arg)
    {
        pthread_detach(pthread_self());
        struct timespec delay;
        delay.tv_sec = 10 * 60;
        delay.tv_nsec = 0;
    
        sigset_t mask;
        sigfillset(&mask);
        sigfillset(&mask, SIGALRM);
        pthread_sigmask(SIG_BLOCK, &mask, NULL);//线程屏蔽信号
    
        nanosleep(%delay, NULL);
        globalStop = true;
        cout << "time to exit" << endl;
    
        return NULL;
    }
    
    bool parseParameters(int argc, char *argv[])
    {
        int c;
        while((c = getopt(argc, argv, "h:p:q:l:t:")) != -1)
        {
            switch(c)
            {
                case 'p': 
                    {
                        string port(optarg);
                        globalPort = port;
                    }
                    break;
                case 'h':
                    {
                        string host(optarg);
                        globalHost = host;
                    }
                    break;
                case 'q':
                    {
                        string queryFile(optarg);
                        globalQueryFile = queryFile;
                    }
                    break;
                case 'l':
                    {
                        string logFile(optarg); 
                        globalLogFile = logFile;
                    }
                    break;
                case 't':
                    globalThreadNumber = atoi(optarg); 
                    break;
                default:
                    usage(argv[0]);
                    return false;
            } 
        }
    
        return true;
    }
    
    int main(int argc, char* argv[])
    {
        CURLcode code = curl_global_init(CURL_GLOBAL_ALL); 
        if(code != CURLE_OK)
        {
            cout << "curl_global_init error" << endl; 
            exit(-1);
        }
    
        if(!parseParameters(argc, argv))
        {
            cout << "parse parameters error" << endl;
            exit(-1);
        }
    
        signal(SIGINT, sig_handle);//设置信号的处理函数
        signal(SIGTERM, sig_handle);
    
        int queryNumber = getQueryContext(globalQueryFile, globalQueryContext, 100000); 
        if(!queryNumber)
        {
            cout << "read query from file failed" << endl;
            exit(-1);
        }
    
        pthread_t *tids = new pthread_t[globalThreadNumber];
        if(tids == NULL)
        {
            cout << "new pthread failed" << endl;
            exit(-1);
        }
    
        thread_arg *thr_args = new thread_arg[globalThreadNumber];
        if(thr_args == NULL)
        {
            cout << "new pthread arg  failed"<< endl;
            exit(-1);
        }
    
        for(int i = 0; i < globalThreadNumber; ++i)
        {
            thr_args[i].totalQueryNumber = queryNumber;
            thr_args[i].sequence = i;
            if(pthread_create(&tids[i], NULL, doWarmBySendQueryFormFile, &thr_args[i]) != 0)    
            {
                cout << "create curl thread" << i << " error" << endl; 
                exit(-1);
            } 
        }
    
        thread_t twid;
        if(pthread_create(&twid, NULL, time_worker, NULL) != 0)
        {
            cout << "create time worker thread error"<< endl; 
            exit(-1);
        }
    
        for(int i = 0; i < globalThreadNumber; ++i)
        {
            pthread_join(tids[i], NULL);//主线程等待子线程执行完毕后退出
        }
    
        delete[] tids;
        delete[] thr_args;
        curl_global_cleanup();
        return 0;
    }
    
    
    

    写完上述代码,发现其中还是有一些问题:

    1. 代码的书写规范有问题
    2. 没有对请求成功和失败结果的统计
    3. 函数行数太多
    4. 可以将上述方法封装成一个类
    5. 在异步的同时也可以控制并发

    后来通过改进就变成了下面的代码:

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include <stdarg.h>
    #include <pthread.h>
    #include <signal.h>
    
    #include <curl/curl.h>
    
    #include <iostream>
    #include <vector>
    #include <string>
    #include <fstream>
    
    using namespace std;
    
    vector<string> globalCompletedQuerys;
    
    string globalHost               = "localhost";
    string globalPort               = "6351";
    string globalQueryFile          = "query.txt";
    volatile bool globalStop        = false;
    
    static int  globalThreadNumber  = 3;
    static int  globalConcurrence   = 1;
    static long globalQueryNumber   = 1;
    
    void sigHandle(int)
    {
        globalStop = true;
        cout << "stop "<< endl;
    }
    
    class statisticsRequest
    {
        private:
            int failedRequest;
            int successRequest;
        public:
            statisticsRequest()
            {
                failedRequest = 0;
                successRequest = 0; 
            }
            void increaseFailedRequest(){ failedRequest++; }
            void increaseSucessRequest(){ successRequest++; }
            int getFailedRequest(){ return failedRequest; }
            int getSuccessRequest(){ return successRequest; }
    };
    
    struct threadArg
    {
        int  sequence;
        int  concurrence;
        int  totalQueryNumber;
        statisticsRequest * statsReq;
        threadArg():sequence(-1), concurrence(1), totalQueryNumber(0), statsReq(NULL){}
    };
    
    size_t writeResponse(void *contents, size_t size, size_t nmemb, void *stream)
    {
        string data((const char *)contents, (size_t)size * nmemb);
        statisticsRequest *request = (statisticsRequest *)stream;
    
        if(data.substr(0, 9) == "rsp_ec: 0")
        {
            request->increaseSucessRequest();
        }
        else if(data.substr(0, 6) == "rsp_ec" && data.substr(0, 9) != "rsp_ec: 0")
        {
            request->increaseFailedRequest();
        }
    
        return size * nmemb;
    }
    
    void getCompletedQuery(const string& queryContext, string& completedQuery)
    {
    
        completedQuery.clear();//注意初始化数组
        completedQuery += "http://";
        completedQuery +=  globalHost;
        completedQuery += ":";
        completedQuery += globalPort;
        completedQuery += "/mod_qsrch/warmup?kw=";
        completedQuery +=  queryContext;
    
        return;
    }
    
    void setCurlEasy(CURL *curl, const string &completedQuery, statisticsRequest *response)
    {
        curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L); 
        curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
        curl_easy_setopt(curl, CURLOPT_URL, completedQuery.c_str());
        curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writeResponse);
        curl_easy_setopt(curl, CURLOPT_HEADERDATA, (void *)response);
    
        return;
    }
    
    bool asyncSetCurlEasy(const vector<string>& completedQuerys, 
                          int queryBeginPosition,
                          size_t queryNumber,
                          CURLM *curlm,
                          statisticsRequest* statsReq)
    {
        if(completedQuerys.empty() || statsReq == NULL || curlm == NULL)
            return false;
    
        for(size_t i = 0; !globalStop && i < queryNumber; ++i)
        {
            CURL *curl = curl_easy_init(); 
            if(curl)
            {
                setCurlEasy(curl, completedQuerys[queryBeginPosition + i], statsReq);
                curl_multi_add_handle(curlm, curl); 
            }
            else
                return false;
        }
    
        return true;
    }
    
    void asyncDealWithCurlCode(CURLM *curlm)
    {
        if(curlm == NULL) 
            return;
    
        int leftMsg = 0;
        CURLMsg* msg = NULL;
    
        while(!globalStop && (msg = curl_multi_info_read(curlm, &leftMsg)) != NULL)
        {
            if(msg->msg == CURLMSG_DONE) 
            {
                int httpStatusCode = 0; 
                curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &httpStatusCode);
    
                char *effectiveUrl = NULL;
                curl_easy_getinfo(msg->easy_handle, CURLINFO_EFFECTIVE_URL, &effectiveUrl); 
                //cout << "url: " << effectiveUrl << " status:  " << httpStatusCode << "  " << curl_easy_strerror(msg->data.result)  << endl;         
            }
        }
    
        return;
    }
    
    void splitQuery(const vector<string> &queryContexts, vector<string> &completedQuerys)
    {
        for(vector<string>::const_iterator it = queryContexts.begin(); it !=queryContexts.end(); ++it)
        {
            completedQuerys.push_back("");
            getCompletedQuery(*it,completedQuerys.back());  
        }
    
    }
    
    void asyncSendRequestAndGetResponse(const vector<string>& completedQuerys, 
                                       int sequence,
                                       int queryBeginPosition, 
                                       int concur,
                                       int queryNumber,
                                       statisticsRequest *statsReq)
    {
        CURLM *curlm = curl_multi_init();
        if(curlm == NULL) 
        {
            cout << "init curl multi failed" << endl; 
            return;
        }
    
        if(concur > queryNumber)
            concur = queryNumber;
    
        int runningCurls = 0, queryStart = queryBeginPosition; 
        int queryEndPosition = queryBeginPosition + queryNumber;
        int sendedRequest = queryBeginPosition - queryStart;
        int receivedResponse = statsReq->getFailedRequest() + statsReq->getSuccessRequest();
    
        asyncSetCurlEasy(completedQuerys, queryBeginPosition, concur, curlm, statsReq);
        queryBeginPosition += concur;
    
        do{
            curl_multi_perform(curlm, &runningCurls);
            asyncDealWithCurlCode(curlm); 
    
            sendedRequest = queryBeginPosition - queryStart;
            receivedResponse = statsReq->getFailedRequest() + statsReq->getSuccessRequest();
    
            if(queryBeginPosition != queryEndPosition && sendedRequest - receivedResponse < concur)
            {
                int curlNumberToAdd = concur - (sendedRequest - receivedResponse);
    
                if(queryBeginPosition + curlNumberToAdd < queryEndPosition)
                {
                    asyncSetCurlEasy(completedQuerys, queryBeginPosition, curlNumberToAdd, curlm, statsReq);
                    queryBeginPosition += curlNumberToAdd;
                }
                else if(queryBeginPosition < queryEndPosition)
                {
                    asyncSetCurlEasy(completedQuerys, queryBeginPosition, queryEndPosition - queryBeginPosition, curlm, statsReq);
                    queryBeginPosition += queryEndPosition - queryBeginPosition;
                }
            }
    
            curl_multi_wait(curlm, NULL, 0, 200000, NULL);  
    
            //cout << pthread_self() << " receivedResponse: " << receivedResponse << "queryNumber :" << queryNumber << endl;  
        }while(receivedResponse < queryNumber && !globalStop);
    
        curl_multi_cleanup(curlm); 
    
        return;
    }
    
    int getQueryContext(const string& path, vector<string>& queryContexts, int num)
    {
        int index = 0;
        string line;
        ifstream in(path.c_str());
        while(!globalStop && index < num && (in >> line)){
            if(line.empty())
                continue;
            queryContexts.push_back(line);
            index++;
        }
        in.close();
    
        return index;
    }
    
    void *doWarmBySendQueryFormFile(void *arg)
    {
        struct threadArg *queryMsg = (threadArg*)arg; 
        int averageQueryNumber = queryMsg->totalQueryNumber / globalThreadNumber;
        int queryBeginPosition = queryMsg->sequence * averageQueryNumber;
        if(queryMsg->sequence == globalThreadNumber - 1) 
            averageQueryNumber = queryMsg->totalQueryNumber - averageQueryNumber * queryMsg->sequence;
        //cout << "thread " << queryMsg->sequence << " query begin position is " << queryBeginPosition << " query number is " << averageQueryNumber << "concurrence is " << queryMsg->concurrence << endl;
         asyncSendRequestAndGetResponse(globalCompletedQuerys, queryMsg->sequence, queryBeginPosition, queryMsg->concurrence,  averageQueryNumber, queryMsg->statsReq);
    
        return NULL;
    }
    
    void usage(const char* pname)
    {
        cout << pname << 
               "-p port "
               "-h host "
               "-i input_file "
               "-t thread_number" << endl;
    }
    
    void *timeWorker(void *arg)
    {
        pthread_detach(pthread_self());
        struct timespec delay;
        delay.tv_sec = 10 * 60;
        delay.tv_nsec = 0;
    
        sigset_t mask;
        sigfillset(&mask);
        sigdelset(&mask, SIGALRM);
        pthread_sigmask(SIG_BLOCK, &mask, NULL);
    
        nanosleep(&delay, NULL);
        globalStop = true;
        cout << "time to exit" << endl;
    
        return NULL;
    }
    
    bool parseParameters(int argc, char *argv[])
    {
        int c;
        while((c = getopt(argc, argv, "h:p:i:t:n:c:")) != -1)
        {
            switch(c)
            {
                case 'p': 
                    {
                        string port(optarg);
                        globalPort = port;
                    }
                    break;
                case 'h':
                    {
                        string host(optarg);
                        globalHost = host;
                    }
                    break;
                case 'i':
                    {
                        string queryFile(optarg);
                        globalQueryFile = queryFile;
                    }
                    break;
                case 't':
                    globalThreadNumber = atoi(optarg); 
                    break;
                case 'n':
                    globalQueryNumber = atoi(optarg);
                    break;
                case 'c':
                    globalConcurrence = atoi(optarg);
                    break;
                default:
                    usage(argv[0]);
                    return false;
            } 
        }
    
        return true;
    }
    
    void getStatisticsRequest(statisticsRequest statsReqs[], int *totalSuccessRequest, int *totalFailedRequest)
    {
        for(int i = 0; i < globalThreadNumber; i++)
        {
            *totalSuccessRequest += statsReqs[i].getSuccessRequest();
            *totalFailedRequest  += statsReqs[i].getFailedRequest(); 
        }
    }
    
    int main(int argc, char* argv[])
    {
        CURLcode code = curl_global_init(CURL_GLOBAL_ALL); 
        if(code != CURLE_OK)
        {
            cout << "curl_global_init error" << endl; 
            exit(-1);
        }
    
        if(!parseParameters(argc, argv))
        {
            cout << "parse parameters error" << endl;
            exit(-1);
        }
    
        signal(SIGINT, sigHandle);
        signal(SIGTERM, sigHandle);
    
        vector<string> queryContexts;
        int queryNumber = getQueryContext(globalQueryFile, queryContexts, globalQueryNumber); 
        if(!queryNumber)
        {
            cout << "read query from file failed" << endl;
            exit(-1);
        }
        splitQuery(queryContexts, globalCompletedQuerys);
    
        pthread_t *tids = new pthread_t[globalThreadNumber];
        if(tids == NULL)
        {
            cout << "new pthread failed" << endl;
            exit(-1);
        }
    
        threadArg *thr_args = new threadArg[globalThreadNumber]();
        if(thr_args == NULL)
        {
            cout << "new pthread arg  failed"<< endl;
            exit(-1);
        }
    
        statisticsRequest *statsReqs = new statisticsRequest[globalThreadNumber]();
        if(statsReqs == NULL) 
        {
            cout << "new statsReqs arg  failed"<< endl;
            exit(-1);
        }
    
        for(int i = 0; i < globalThreadNumber; ++i)
        {
            thr_args[i].sequence = i;
            thr_args[i].statsReq = &statsReqs[i];
            thr_args[i].concurrence = globalConcurrence;
            thr_args[i].totalQueryNumber = globalQueryNumber;
    
            if(pthread_create(&tids[i], NULL, doWarmBySendQueryFormFile, &thr_args[i]) != 0)    
            {
                cout << "create curl thread" << i << " error" << endl; 
                exit(-1);
            } 
        }
    
        pthread_t twid;
        if(pthread_create(&twid, NULL, timeWorker, NULL) != 0)
        {
            cout << "create time worker thread error"<< endl; 
            exit(-1);
        }
    
        for(int i = 0; i < globalThreadNumber; ++i)
        {
            pthread_join(tids[i], NULL);
        }
    
        int totalSuccessRequest = 0;
        int totalFailedRequest  = 0;
        getStatisticsRequest(statsReqs, &totalSuccessRequest, &totalFailedRequest);
        cout << "total request : " << totalSuccessRequest + totalFailedRequest << endl
             << "totalSuccessRequest is " << totalSuccessRequest << endl
             << "totalFailedRequest is " << totalFailedRequest << endl;
    
        delete[] statsReqs; 
        delete[] thr_args;
        delete[] tids;
    
        curl_global_cleanup();
        return 0;
    }
    

    但是上述代码并没有对前面提出的问题进行特别的改进,只是改进了控制并发的问题,而且并不是特别完美,最完美的方式是在其中使用nanosleep函数来控制时间,所以还需要进行改进。

    展开全文
  • //curl_easy_setopt(m_curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0"); // https, skip the verification of the server's certificate. curl_easy_setopt(m_curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);...

    DownloadThread.h

    #ifndef DOWNLOAD_THREAD_H
    #define DOWNLOAD_THREAD_H
    
    #include <functional>
    #include <mutex>
    #include <list>
    #include <thread>
    #include <atomic>
    #include <condition_variable>
    #include <curl/curl.h>
    
    typedef struct _Task_
    {
        int                 nIndex;          // 图片index
        std::string         strUrl;          // url: http or https
        std::string         strDescription;   // 用于标识不同的图片(ID),透传到Callback
        std::string         strData;         // 下载得到的数据
        int                 nErrorCode;      // 错误码
        std::string         strErrorMsg;     // 错误信息
    } Task;
    
    using DownloadThreadCallback = std::function<void(const Task&)>;
    
    class DownloadThread
    {
    public:
        DownloadThread() = default;
        ~DownloadThread()=default;
        int Initialize(const DownloadThreadCallback &cb);
        int AddDownloadTask(const Task &task);
        void Finish();
    
    private:
        int InitializeCURL();
        void FinishCURL();
        void Run();
        void CallBack(const Task &t);
    private:
        int                     m_threadId;
        DownloadThreadCallback  m_callback;
        CURL                   *m_curl_handle;
        std::thread            *m_thread;
        std::atomic<bool>       m_bIsRunning;
        std::list<Task>         m_taskList;
        std::mutex              m_mutexTaskList;
        std::condition_variable m_condition;
    };
    
    #endif // DOWNLOAD_THREAD_H

    DownloadThread.cpp

    #include "DownloadThread.h"
    
    #include "string.h"
    #include <iostream>
    #include <string>
    
    static size_t
    WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
    {
        size_t realsize = size * nmemb;
    
        char* memory = (char*)malloc(realsize);
        if (memory == NULL) {
            /* out of memory! */
            printf("not enough memory (realloc returned NULL)\n");
            return 0;
        }
        memcpy(memory, contents, realsize);
    
        Task *t = (Task*)userp;
        t->strData.append(memory, realsize);
    
        free(memory);
    
        return realsize;
    }
    
    
    int DownloadThread::Initialize(const DownloadThreadCallback &cb)
    {
        m_callback = cb;
        if (InitializeCURL() != 0)
        {
            return -1;
        }
    
        m_bIsRunning = true;
        m_thread = new std::thread(&DownloadThread::Run, this);
    }
    
    void DownloadThread::Finish()
    {
        m_bIsRunning = false;
        if (m_thread->joinable())
        {
            m_thread->join();
        }
    
        FinishCURL();
    }
    
    void DownloadThread::FinishCURL()
    {
        /* cleanup curl stuff */
        curl_easy_cleanup(m_curl_handle);
    
        /* we're done with libcurl, so clean it up */
        curl_global_cleanup();
    }
    
    int DownloadThread::AddDownloadTask(const Task &task)
    {
        std::lock_guard<std::mutex> lk(m_mutexTaskList);
        m_taskList.push_back(task);
        m_condition.notify_one();
        return 0;
    }
    
    void DownloadThread::Run()
    {
        while (m_bIsRunning.load())
        {
            //std::cout << "thread is running" << std::endl;
    
            std::unique_lock<std::mutex>  lk(m_mutexTaskList);
            while (m_taskList.empty())
            {
                m_condition.wait(lk);
            }
    
            // get first element
            Task task;
            {
                //std::lock_guard<std::mutex> lkg(m_mutexTaskList);
                task = m_taskList.front();
                m_taskList.pop_front();
            }
            //std::cout << "get task url: " << task.strUrl << std::endl;
            
            CURLcode res;
            curl_easy_setopt(m_curl_handle, CURLOPT_WRITEDATA, (void *)&task);
            /* specify URL to get */
            curl_easy_setopt(m_curl_handle, CURLOPT_URL, const_cast<char*>(task.strUrl.c_str()));
            //curl_easy_setopt(m_curl_handle, CURLOPT_URL, "http://10.66.91.15:7777/ld/smog/2612_src.jpg");
    
            /* get it! */
            res = curl_easy_perform(m_curl_handle);
            /* check for errors */
            if (res != CURLE_OK) {
                // fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
                task.nErrorCode = res;
                task.strErrorMsg = curl_easy_strerror(res);
            }
            else {
                /*
                * Now, our chunk.memory points to a memory block that is chunk.size
                * bytes big and contains the remote file.
                *
                * Do something nice with it!
                */
                /*printf("%lu bytes retrieved\n", task.strData.size());*/
                long lStateCode = 0;
                res = curl_easy_getinfo(m_curl_handle, CURLINFO_RESPONSE_CODE, &lStateCode);
                if (res != CURLE_OK || lStateCode != 200)
                {
                    task.nErrorCode = lStateCode;
                    // task.strErrorMsg = curl_easy_strerror(res);
                    task.strErrorMsg = "http state code is " + std::to_string(lStateCode);
                }
            }
    
            // callback
            CallBack(task);
        }
    }
    
    int DownloadThread::InitializeCURL()
    {
        CURLcode error;
    
        curl_global_init(CURL_GLOBAL_ALL);
        /* init the curl session */
        m_curl_handle = curl_easy_init();
    
        ///* specify URL to get */
        curl_easy_setopt(curl_handle, CURLOPT_URL, "http://10.66.91.15:7777/ld/smog/2612_src.jpg");
    
        /* send all data to this function */
        curl_easy_setopt(m_curl_handle, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
        /* we pass our 'chunk' struct to the callback function */
        /*curl_easy_setopt(m_curl_handle, CURLOPT_WRITEDATA, (void *)this);*/
    
        curl_easy_setopt(m_curl_handle, CURLOPT_FOLLOWLOCATION, 1);
    
    
        /* some servers don't like requests that are made without a user-agent
        field, so we provide one */
        //curl_easy_setopt(m_curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0");
    
        // https, skip the verification of the server's certificate.
        curl_easy_setopt(m_curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
        curl_easy_setopt(m_curl_handle, CURLOPT_SSL_VERIFYHOST, 0L);
    
        /* 设置连接超时,单位:毫秒 */
        curl_easy_setopt(m_curl_handle, CURLOPT_CONNECTTIMEOUT_MS, 1000L);
    
        // add by yexiaoyogn 10 second time out 
        curl_easy_setopt(m_curl_handle, CURLOPT_TIMEOUT_MS, 2000);
    
        //add yexiaoyong set time out
        curl_easy_setopt(m_curl_handle, CURLOPT_NOSIGNAL, 3);
    }
    
    void DownloadThread::CallBack(const Task &t)
    {
        m_callback(t);
    }
    

    demo.cpp

    #include "DownloadThread.h"
    
    #include <iostream>
    #include <thread>
    #include <sstream>
    #include <fstream>
    #include <string>
    
    void TaskCallBack(const Task &t)
    {
        std::ostringstream str;
        str << "callback ...." << std::endl;
        str << "task id is " << t.nIndex << " url is " << t.strUrl << " data size is " << t.strData.size() << std::endl;
        str << "error number is " << t.nErrorCode << " error msg is " << t.strErrorMsg << std::endl;
        std::cout << str.str() << std::endl;
    
        if (t.nErrorCode == 0)
        {
            std::string filename = t.strDescription + ".jpg";
            std::fstream file;
            file.open(filename, std::fstream::in | std::fstream::out | std::fstream::binary | std::fstream::app);
            file.write(t.strData.c_str(), t.strData.size());
            file.close();
        }
    }
    
    
    int main()
    {
        DownloadThread dt;
        dt.Initialize(TaskCallBack);
    
        Task t;
        int cnt = 0;
        while (true)
        {
            t.nIndex = cnt++;
            t.strDescription = "task_"+std::to_string(cnt);
            t.strUrl = "http://10.66.91.15:7777/ld/smog/2612_src.jpg";
            dt.AddDownloadTask(t);
    
            t.nIndex = cnt++;
            t.strDescription = "task_" + std::to_string(cnt);
            t.strUrl = "http://www.baidu.com";
            dt.AddDownloadTask(t);
    
            t.nIndex = cnt++;
            t.strDescription = "task_" + std::to_string(cnt);
            t.strUrl = "http://10.66.91.15:7777/ld/smog/2612_srcxzz.jpg";
            dt.AddDownloadTask(t);
        }
    
        while (true){
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
        dt.Finish();
    }

    转载于:https://www.cnblogs.com/walkinginthesun/p/9621198.html

    展开全文
  • http客户端 基于http异步请求的libcurl
  •  curl是一款利用URL语法进行文件传输的工具,它支持多种协议,包括FTP, FTPS, HTTP, HTTPS, GOPHER, TELNET等,我们既可以在命令行上使用它,也可以利用 ...相信大部分同学都应该使用过libcurl的easy 接口,ea
  • PHP标准库内置curl扩展,不过实现不完整,如multi_socket_action接口,无意中发现pecl http库同样基于libcurl封装,支持更多的libcurl特性,更新也比较快,底层通过libevent(epoll)实现multi_socket_action接口,...
  • 为了实现curl高性能,高并发,需要研究如何实现高性能高并发。研究方向有三个。 (1) 长连接。考虑采用长连接的方式去开发。首先研究下长连接和短连接的性能区别。curl内部是通过socket去连接通讯。socket每次连接...
  • 使用libcurl实现的下载器,取消下载

    千次阅读 2016-11-04 10:07:47
    转载文章 http://blog.csdn.net/robertbaker/article/details/43703907 转载文章 ... 谢谢版主 一、使用libcurl实现的下载器 libcurl的主页:http://curl.haxx.se/ 头文件: /*******...
  • 利用epoll实现异步IO

    2019-01-21 10:50:47
    利用epoll实现异步IO  之前异步IO一直没搞明白,大致的理解就是在一个大的循环中,有两部分:第一部分是监听事件;第二部分是处理事件(通过添加回调函数的方式)。就拿网络通信来说,可以先通过调用 ...
  • Libcurl实现断点续传

    2016-07-15 13:39:41
    一、LIbcurl简单介绍  其实关于Liccurl的介绍最好的是看官方文档:http://curl.haxx.se/ 几乎大部分的信息里面都能够查找到。  在这边简要介绍:  1)跨平台特性,几乎所有平台都可以使用  2)有...
  • php_http_parser PHP解析HTTP扩展基于node.js http-parser的PHP扩展,可用于实现异步PHP程序libcurl提供了异步调用方式,有两种风格:ONE MULTI HANDLE MANY EASY HANDLES:加入多个easy handle后执行curl_multi_...
  • 基于libcurl官网demo,实现http post 并发 异步 demo。以文件为介质保存post参数,执行后,将失败链接的参数回存此文件。HandleCurlMulti 以轮询方式进行,可以设置每次的吞吐量。待优化的地方为文件存储,如果多...
  • 本文承接自前篇博客将Cocos2d-x的...2)实现Android异步任务 下面直接上代码: 1】jni之first.c:first.h无变化,first.c添加如下代码,切记C变量必须把声明方法函数的开始 /* * Copyright (C) 2009 The Android
  • pb libcurl http https 调用

    2020-08-19 01:19:49
    pb11.5 + pbni + libcurl 调用http、https,实现了get、post(返回文本支持gzip解压,post支持gzip压缩上传)、文件上传、下载(带进度条显示);提供同步、异步两种操作方式;pb11.5以上版本,可自行升级,应该可以...
  • 采用libuv的epoll方式实现异步高性能libcurl发送数据的方法 讲述了采用libcurl发送数据的基础方法和高性能方法,基础方法较为容易但性能一般,高级方法的性能卓越但比较难理解,这里再给出一个保证性能的同时又...
  • 官方也给出了解决方法,可以使用c-ares[2]的libcurl版本实现异步域名解析来预防这种“糟糕”的情况,但是最后一句还是告诫我们:在多线程场景下,若不设置CURLOPT_NOSIGNAL选项,可能会有“意外”的情况发生。...
  • Socket编程系列初步计划:Socket编程系列之...然后亲自动手跟着视频练习Libcurl实现HTTP网络编程和FTP编程,并调试分析。然后进行Libcurl异步请求、提高IO效率,并讲解原理。最后项目实战进行Libcurl网络爬虫编程实战。
  • 基于libcurl的开发

    千次阅读 2022-03-21 10:32:18
    libcurl使用方法简介 1.简介 libcurl是一个跨平台的开源网络协议库,支持http, https, rtsp等多种协议 。libcurl同样支持HTTPS证书授权,HTTP POST, HTTP PUT, FTP 上传, HTTP基本表单上传,代理,cookies,和用户...
  • libcurl入门之简介

    2021-03-06 14:32:30
    1 简介 1.1 支持协议 DICT, FILE, FTP, FTPS, GOPHER, GOPHERS, HTTP, HTTPS, IMAP, IMAPS, LDAP, LDAPS, MQTT, POP3,... curl 支持 SSL 认证, libcurl同样支持HTTPS证书授权,HTTP POST, HTTP PUT, FTP 上传, HTTP基
  • libcurl 域名解析分析

    2020-11-17 15:07:40
    背景 我们公司的产品使用 libcurl 作为基础网络库,线上环境中经常会有域名解析失败导致的问题。libcurl 的域名解析默认情况下是调用系统 API 完成的,并且用户的网络环境...这篇文章主要记录一下我是如何实现 libcurl
  • 需求分析需要做到同步和异步异步请求的返回以可选的回调通知的方式进行。 本人以Linux为例,一步一步的来实现。 配置并且编译libcurl我以在Linux底下的交叉编译举例。libcurl源码下载:...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,313
精华内容 925
关键字:

libcurl实现异步