精华内容
下载资源
问答
  • 一 概念说明1 模型 并行复制是典型生产者、消费者模式,Coordinator作为生产者,worker线程作为消费者。2 Waiting for preceding transaction to commit 当前事务无法和正在回放事务并发回放出现等待二 延迟...

    一 概念说明

    1 模型 并行复制是典型的生产者、消费者模式,Coordinator作为生产者,worker线程作为消费者。

    2 Waiting for preceding transaction to commit 当前事务无法和正在回放的事务并发回放出现的等待

    二 延迟出现的err日志打印说明

    可以根据日志统计进行分析

    Multi-threaded slave statistics for channel ”:

    seconds elapsed = 121;

    eventsassigned = 100374529; 总共有多少个event被分配执行,计的是总数。

    queues filled over overrun level = 0; 多线程同步中,worker 的私有队列长度超长的次数,计的是总数。

    waited due aWorker queue full = 0; 因为worker的队列超长而产生等待的次数,计的是总数

    waited due the total size = 0; 超过最大size的次数

    waited at clock conflicts= 1451875661700

    waited (count) when Workers occupied = 3211993 因为workder被占用而出现等待的次数。(总计值)。

    waited when Workers occupied = 445032386000 因为workder被占用而出现等待的总时间,总计值,单位是纳秒。

    三 出现的几种情况

    1 主从同步发生错误,导致从库延时

    观察 这里可以对sql_error和双线程进行观察,就能观察出问题

    解决方式 进行数据修复,保证主从数据的一致性

    2 主从同步发生大事务,导致从库延时

    观察

    1 通过show processlist进行观察

    2 exec_master_position 一直不会变

    3 SQL STATUS 一直出现 Waiting for preceding transaction to commit

    大表->DDL/大事务的执行是并行复制所无法解决的,会拖累甚至卡住整个复制进度

    解决方式 大事务进行拆分,表进行拆分,避免或者减少这种情况的发生

    3 主库压力很大,同时并发数高,导致从库应用繁忙

    观察 1 观察主库binlog生成量和事务监控峰值

    2 从库执行语句

    SELECT thread_id,count_star FROM performance_schema.events_transactions_summary_by_thread_by_event_name

    WHERE thread_id IN (

    SELECT thread_id FROM performance_schema.replication_applier_status_by_worker);

    这条语句是用来统计worker线程应用事务的并发度数量的,可以进行推测

    3 从库的util值非常高

    解决方式 分库分表,改造业务,减少单台集群的压力

    四 总结

    和我之前排查异步复制的思路差不多,只是在并行复制的角度下

    展开全文
  • 上篇文章提起关于HBase插入性能优化设计到的五个参数,从参数配置的角度给大家提供了一个性能测试环境的实验代码。...本文给出了基于多线程并发模式的,测试代码案例和实测结果,希望能给大家一些启示。

    一、引言:

      上篇文章提起关于HBase插入性能优化设计到的五个参数,从参数配置的角度给大家提供了一个性能测试环境的实验代码。根据网友的反馈,基于单线程的模式实现的数据插入毕竟有限。通过个人实测,在我的虚拟机环境下,单线程插入数据的值约为4w/s。集群指标是:CPU双核1.83,虚拟机512M内存,集群部署单点模式。本文给出了基于多线程并发模式的,测试代码案例和实测结果,希望能给大家一些启示:

    二、源程序:

    复制代码
      1 import org.apache.hadoop.conf.Configuration;
      2 import org.apache.hadoop.hbase.HBaseConfiguration;
      3 import java.io.BufferedReader;
      4 import java.io.File;
      5 import java.io.FileNotFoundException;
      6 import java.io.FileReader;
      7 import java.io.IOException;
      8 import java.util.ArrayList;
      9 import java.util.List;
     10 import java.util.Random;
     11 
     12 import org.apache.hadoop.conf.Configuration;
     13 import org.apache.hadoop.hbase.HBaseConfiguration;
     14 import org.apache.hadoop.hbase.client.HBaseAdmin;
     15 import org.apache.hadoop.hbase.client.HTable;
     16 import org.apache.hadoop.hbase.client.HTableInterface;
     17 import org.apache.hadoop.hbase.client.HTablePool;
     18 import org.apache.hadoop.hbase.client.Put;
     19 
     20 public class HBaseImportEx {
     21     static Configuration hbaseConfig = null;
     22     public static HTablePool pool = null;
     23     public static String tableName = "T_TEST_1";
     24     static{
     25          //conf = HBaseConfiguration.create();
     26          Configuration HBASE_CONFIG = new Configuration();
     27          HBASE_CONFIG.set("hbase.master", "192.168.230.133:60000");
     28          HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.230.133");
     29          HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
     30          hbaseConfig = HBaseConfiguration.create(HBASE_CONFIG);
     31          
     32          pool = new HTablePool(hbaseConfig, 1000); 
     33     }
     34     /*
     35      * Insert Test single thread
     36      * */
     37     public static void SingleThreadInsert()throws IOException
     38     {
     39         System.out.println("---------开始SingleThreadInsert测试----------");
     40         long start = System.currentTimeMillis();
     41         //HTableInterface table = null;
     42         HTable table = null;
     43         table = (HTable)pool.getTable(tableName);
     44         table.setAutoFlush(false);
     45         table.setWriteBufferSize(24*1024*1024);
     46         //构造测试数据
     47         List<Put> list = new ArrayList<Put>();
     48         int count = 10000;
     49         byte[] buffer = new byte[350];
     50         Random rand = new Random();
     51         for(int i=0;i<count;i++)
     52         {
     53             Put put = new Put(String.format("row %d",i).getBytes());
     54             rand.nextBytes(buffer);
     55             put.add("f1".getBytes(), null, buffer);
     56             //wal=false
     57             put.setWriteToWAL(false);
     58             list.add(put);    
     59             if(i%10000 == 0)
     60             {
     61                 table.put(list);
     62                 list.clear();    
     63                 table.flushCommits();
     64             }            
     65         }
     66         long stop = System.currentTimeMillis();
     67         //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
     68           
     69         System.out.println("插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
     70         
     71         System.out.println("---------结束SingleThreadInsert测试----------");
     72     }
     73     /*
     74      * 多线程环境下线程插入函数 
     75      * 
     76      * */
     77     public static void InsertProcess()throws IOException
     78     {
     79         long start = System.currentTimeMillis();
     80         //HTableInterface table = null;
     81         HTable table = null;
     82         table = (HTable)pool.getTable(tableName);
     83         table.setAutoFlush(false);
     84         table.setWriteBufferSize(24*1024*1024);
     85         //构造测试数据
     86         List<Put> list = new ArrayList<Put>();
     87         int count = 10000;
     88         byte[] buffer = new byte[256];
     89         Random rand = new Random();
     90         for(int i=0;i<count;i++)
     91         {
     92             Put put = new Put(String.format("row %d",i).getBytes());
     93             rand.nextBytes(buffer);
     94             put.add("f1".getBytes(), null, buffer);
     95             //wal=false
     96             put.setWriteToWAL(false);
     97             list.add(put);    
     98             if(i%10000 == 0)
     99             {
    100                 table.put(list);
    101                 list.clear();    
    102                 table.flushCommits();
    103             }            
    104         }
    105         long stop = System.currentTimeMillis();
    106         //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
    107           
    108         System.out.println("线程:"+Thread.currentThread().getId()+"插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
    109     }
    110     
    111     
    112     /*
    113      * Mutil thread insert test
    114      * */
    115     public static void MultThreadInsert() throws InterruptedException
    116     {
    117         System.out.println("---------开始MultThreadInsert测试----------");
    118         long start = System.currentTimeMillis();
    119         int threadNumber = 10;
    120         Thread[] threads=new Thread[threadNumber];
    121         for(int i=0;i<threads.length;i++)
    122         {
    123             threads[i]= new ImportThread();
    124             threads[i].start();            
    125         }
    126         for(int j=0;j< threads.length;j++)
    127         {
    128              (threads[j]).join();
    129         }
    130         long stop = System.currentTimeMillis();
    131           
    132         System.out.println("MultThreadInsert:"+threadNumber*10000+"共耗时:"+ (stop - start)*1.0/1000+"s");        
    133         System.out.println("---------结束MultThreadInsert测试----------");
    134     }    
    135 
    136     /**
    137      * @param args
    138      */
    139     public static void main(String[] args)  throws Exception{
    140         // TODO Auto-generated method stub
    141         //SingleThreadInsert();        
    142         MultThreadInsert();
    143         
    144         
    145     }
    146     
    147     public static class ImportThread extends Thread{
    148         public void HandleThread()
    149         {                        
    150             //this.TableName = "T_TEST_1";
    151         
    152             
    153         }
    154         //
    155         public void run(){
    156             try{
    157                 InsertProcess();            
    158             }
    159             catch(IOException e){
    160                 e.printStackTrace();                
    161             }finally{
    162                 System.gc();
    163                 }
    164             }            
    165         }
    166 
    167 }
    复制代码

    三、说明

    1.线程数设置需要根据本集群硬件参数,实际测试得出。否则线程过多的情况下,总耗时反而是下降的。

    2.单笔提交数对性能的影响非常明显,需要在自己的环境下,找到最理想的数值,这个需要与单条记录的字节数相关。

    四、测试结果

    ---------开始MultThreadInsert测试----------

    线程:8插入数据:10000共耗时:1.328s
    线程:16插入数据:10000共耗时:1.562s
    线程:11插入数据:10000共耗时:1.562s
    线程:10插入数据:10000共耗时:1.812s
    线程:13插入数据:10000共耗时:2.0s
    线程:17插入数据:10000共耗时:2.14s
    线程:14插入数据:10000共耗时:2.265s
    线程:9插入数据:10000共耗时:2.468s
    线程:15插入数据:10000共耗时:2.562s
    线程:12插入数据:10000共耗时:2.671s
    MultThreadInsert:100000共耗时:2.703s
    ---------结束MultThreadInsert测试----------


    备注:该技术专题讨论正在群Hadoop高级交流群:293503507同步直播中,敬请关注。

    展开全文
  • 读写锁与互斥量类似,但读写锁允许更高的并行性。其特性为:写独占,读共享。 读写锁特性: (1)读写锁是“写模式加锁”时,解锁前,所有对该锁加锁线程都会被阻塞。 (2)读写锁是“读模式加锁”时,如果线程以读...

    案例一

    使用读写锁实现线程同步
    读写锁与互斥量类似,但读写锁允许更高的并行性。其特性为:写独占,读共享。
    读写锁特性:
    (1)读写锁是“写模式加锁”时,解锁前,所有对该锁加锁的线程都会被阻塞。
    (2)读写锁是“读模式加锁”时,如果线程以读模式对其加锁会成功。如果线程以写模式加锁会阻塞。
    (3)读写锁是“读模式加锁”时,如果有另外线程试图以写模式加锁,读写锁通常会阻塞随后的读模式锁请求,这样可以避免读模式锁长期占用,而等待的写模式锁请求长期阻塞;
    (4)读写锁非常适合于对数据结构读的次数远大于写的情况。

    //案例
    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    pthread_rwlock_t rwlock;
    void *pthread_one(void *arg)
    {
        /* 分别测试先上写锁和先上读锁的情况 */
        //pthread_rwlock_wrlock(&rwlock);
        pthread_rwlock_rdlock(&rwlock);
        //puts("wrlock locked first, pthread one!");
        puts("rdlock locked first, pthread one!");
        sleep(2);
        puts("after sleep 2s");
        pthread_rwlock_unlock(&rwlock);
    }
    void *pthread_two(void *arg)
    {
        pthread_rwlock_rdlock(&rwlock);
        puts("got the rdlock, pthread two!");
    }
    
    int main()
    {
        int i = 0;
        pthread_t id[2];
        /* 读写锁初始化 */
        pthread_rwlock_init(&rwlock, NULL);
        pthread_create(&id[0], NULL, pthread_one, NULL);
        sleep(1);
        pthread_create(&id[1], NULL, pthread_two, NULL);
        for(; i<2; i++)
            pthread_join(id[i], NULL);
        /* 销毁读写锁 */
        pthread_rwlock_destroy(&rwlock);
        return 0;
    }
    

    案例二

    c语言实现多线程下的链表队列
    项目中需要一个链表,线程A进行入队操作(生产者),线程B进行查询、出队操作(消费者),同时希望线程B在队列为空时阻塞,降低cpu负载,因此考虑用pthread_cond_wait进行实现:
    主要实现功能:
    1)出队和入队的加锁
    2)当有元素入队时唤醒线程B
    3)队列为空时线程B进入休眠

    //头文件
    #ifndef Queue_H
    #define Queue_H
    #include <stdlib.h>
    #include <pthread.h>
    typedef char* Frame;
    typedef struct node * PNode;
    typedef struct node
    {
    	Frame frame;
    	PNode next;
    }Node;
    typedef struct
    {
    	PNode front;
    	PNode rear;
    	int size;
    	pthread_mutex_t q_lock;
    	pthread_cond_t cond;
    }Queue;
    /*构造一个空队列*/
    Queue *InitQueue();
    /*销毁一个队列*/
    void DestroyQueue(Queue *pqueue);
    /*清空一个队列*/
    void ClearQueue(Queue *pqueue);
    /*判断队列是否为空*/
    int IsEmpty(Queue *pqueue);
    /*返回队列大小*/
    int GetSize(Queue *pqueue);
    /*返回队头元素*/
    PNode GetFront(Queue *pqueue, Frame *frame);
    /*返回队尾元素*/
    PNode GetRear(Queue *pqueue, Frame *frame);
    /*将新元素入队*/
    PNode EnQueue(Queue *pqueue,Frame frame);
    /*队头元素出队*/
    PNode DeQueue(Queue *pqueue);
    /*遍历队列并对各数据项调用visit函数*/
    void QueueTraverse(Queue *pqueue,void (*visit)());
    #endif
    
    //C文件
    #include"Queue.h"
    /*构造一个空队列*/
    Queue *InitQueue()
    {
    	Queue *pqueue = (Queue *)malloc(sizeof(Queue));
    	if(pqueue!=NULL)
    	{
    		pqueue->front = NULL;
    		pqueue->rear = NULL;
    		pqueue->size = 0;
    		pthread_mutex_init(&pqueue->q_lock, NULL);		
    		pthread_cond_init(&pqueue->cond, NULL);
    	}
    	return pqueue;
    }
    /*销毁一个队列*/
    void DestroyQueue(Queue *pqueue)
    {
    	if(!pqueue)
    		return;
    	ClearQueue(pqueue);
    	pthread_mutex_destroy(&pqueue->q_lock);
    	pthread_cond_destroy(&pqueue->cond);
    	free(pqueue);
    	pqueue = NULL;
    }
    /*清空一个队列*/
    void ClearQueue(Queue *pqueue)
    {
    	while(!IsEmpty(pqueue)) {
    		DeQueue(pqueue);
    	}
    }
    /*判断队列是否为空*/
    int IsEmpty(Queue *pqueue)
    {
    	if(pqueue->front==NULL&&pqueue->rear==NULL&&pqueue->size==0)
    		return 1;
    	else
    		return 0;
    }
    /*返回队列大小*/
    int GetSize(Queue *pqueue)
    {
    	return pqueue->size;
    }
    /*返回队头元素*/
    PNode GetFront(Queue *pqueue, Frame *frame)
    {
    	pthread_mutex_lock(&pqueue->q_lock);
    	/*
    	if(!IsEmpty(pqueue))
    	{
    		*frame = pqueue->front->frame;
    	}else {
    		pthread_cond_wait(&pqueue->cond, &pqueue->q_lock);
    	}*/
    	while(IsEmpty(pqueue))
    		pthread_cond_wait(&pqueue->cond, &pqueue->q_lock);
    	*frame = pqueue->front->frame;
    	pthread_mutex_unlock(&pqueue->q_lock);
    	return pqueue->front;//---->此处有bug,队列为空时,在锁释放后,pqueue->front可能被入队操作赋值,出现frame等于NULL,而pqueue->front不等于NULL
    }
    /*返回队尾元素*/
    PNode GetRear(Queue *pqueue, Frame *frame)
    {
    	if(!IsEmpty(pqueue)) {
    		*frame = pqueue->rear->frame;
    	}
    	return pqueue->rear;
    }
    /*将新元素入队*/
    PNode EnQueue(Queue *pqueue, Frame frame)
    {
    	PNode pnode = (PNode)malloc(sizeof(Node));
    	if(pnode != NULL) {
    		pnode->frame = frame;
    		pnode->next = NULL;
    		
    		pthread_mutex_lock(&pqueue->q_lock);
    		if(IsEmpty(pqueue)) {
    			pqueue->front = pnode;
    		} else {
    			pqueue->rear->next = pnode;
    		}
    		pqueue->rear = pnode;
    		pqueue->size++;
    		pthread_cond_signal(&pqueue->cond);
    		pthread_mutex_unlock(&pqueue->q_lock);
    	}
    	return pnode;
    }
    /*队头元素出队*/
    PNode DeQueue(Queue *pqueue)
    {
    	PNode pnode = pqueue->front;
    	pthread_mutex_lock(&pqueue->q_lock);
    	if(!IsEmpty(pqueue)) {
    		pqueue->size--;
    		pqueue->front = pnode->next;
    		free(pnode);
    		if(pqueue->size==0)
    			pqueue->rear = NULL;
    	}
    	pthread_mutex_unlock(&pqueue->q_lock);
    	return pqueue->front;
    }
    /*遍历队列并对各数据项调用visit函数*/
    void QueueTraverse(Queue *pqueue, void (*visit)())
    {
    	PNode pnode = pqueue->front;
    	int i = pqueue->size;
    	while(i--)
    	{
    		visit(pnode->frame);
    		pnode = pnode->next;
    	}	
    }
    
    

    案例三

    多线程编程 – 线程安全的链表
    双链表中每个节点都有一个指针指向列表中下一个节点,还有一个指针指向前一个节点。其中不变量就是节点A中指向“下一个”节点B的指针,还有前向指针。为了从列表中删除一个节点,其两边节点的指针都需要更新。当其中一边更新完成时,不变量就被破坏了,直到另一边也完成更新;在两边都完成更新后,不变量就又稳定了。
    从一个列表中删除一个节点的步骤如下
    1.找到要删除的节点N
    2.更新前一个节点指向N的指针,让这个指针指向N的下一个节点
    3.更新后一个节点指向N的指针,让这个指正指向N的前一个节点
    4. 删除节点N
    在这里插入图片描述
    线程间潜在问题就是修改共享数据,致使不变量遭到破坏。当不做些事来确保在这个过程中不会有其他线程进行访问的话,可能就有线程访问到刚刚删除一边的节点;这样的话,线程就读取到要删除节点的数据(因为只有一边的连接被修改,如图3.1(b)),所以不变量就被破坏。破坏不变量的后果是多样,当其他线程按从左往右的顺序来访问列表时,它将跳过被删除的节点。在一方面,如有第二个线程尝试删除图中右边的节点,那么可能会让数据结构产生永久性的损坏,使程序崩溃。无论结果如何,都是并行代码常见错误:条件竞争(race condition)。

    //解决方案: 使用互斥量保护列表
    std::list<int>some_list;
    MutexLock lock;
    void add_to_list(int new_value){
       LockGuard<MutexLock> guard(&lock);
       some_list.push_back(new_value);
    }
    bool list_contains(int value_to_find){
       LockGuard<MutexLock> guard(&lock);
       return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
    }
    

    清单3.1中有一个全局变量①,这个全局变量被一个全局的互斥量保护②。add_to_list()③和list_contains()④函数中使用lock_guard,使得这两个函数中对数据的访问是互斥的:list_contains()不可能看到正在被add_to_list()修改的列表。
    虽然某些情况下,使用全局变量没问题,但在大多数情况下,互斥量通常会与保护的数据放在同一个类中,而不是定义成全局变量。这是面向对象设计的准则:将其放在一个类中,就可让他们联系在一起,也可对类的功能进行封装,并进行数据保护。在这种情况下,函数add_to_list和list_contains可以作为这个类的成员函数。互斥量和要保护的数据,在类中都需要定义为private成员,这会让访问数据的代码变的清晰,并且容易看出在什么时候对互斥量上锁。当所有成员函数都会在调用时对数据上锁,结束时对数据解锁,那么就保证了数据访问时不变量不被破坏。
    当然,也不是总是那么理想,聪明的你一定注意到了 :当其中一个成员函数返回的是保护数据的指针或引用时,会破坏对数据的保护。具有访问能力的指针或引用可以访问(并可能修改)被保护的数据,而不会被互斥锁限制。互斥量保护的数据需要对接口的设计相当谨慎,要确保互斥量能锁住任何对保护数据的访问,并且不留后门。

    案例四

    C++并发实战18: 线程安全的查找表和链表
    经常遇见根据关键字查找内容的应用如DNS查询,标准库的std::map系列可供选择,但是它们是非线程安全的,一个线程安全的查找表实现如下,其主要是通过hash函数将各个key分散到具体的bucket中去,每个bucket带有一个共享锁boost::shared_mutex,从而实现线程安全的高并发数据结构:

    #include <vector>
    #include <memory>
    #include <mutex>
    #include <functional>
    #include <list>
    #include <utility>
    #include <boost/thread/shared_mutex.hpp>
    template<typename Key,typename Value,typename Hash=std::hash<Key> >
    class threadsafe_lookup_table//只针对每个bucket上锁,而全局的vector不上锁,从而降低了锁的粒度。若使用map则是简单的操作都需要对整个map上锁
    {
    private:
        class bucket_type//具体的桶bucket数据结构,带有共享锁boost::shared_mutex
        {
        private:
            typedef std::pair<Key,Value> bucket_value;
            typedef std::list<bucket_value> bucket_data;
            typedef typename bucket_data::iterator bucket_iterator;
            bucket_data data;
            mutable boost::shared_mutex mutex;
     
            bucket_iterator find_entry_for(Key const& key) const
            {
                return std::find_if(data.begin(),data.end(),
                    [&](bucket_value const& item)
                    {return item.first==key;});
            }
        public:
            Value value_for(Key const& key,Value const& default_value) const
            {
                boost::shared_lock<boost::shared_mutex> lock(mutex);
                bucket_iterator const found_entry=find_entry_for(key);
                return (found_entry==data.end())?
                    default_value : found_entry->second;
            }
            void add_or_update_mapping(Key const& key,Value const& value)//异常安全的
            {
                std::unique_lock<boost::shared_mutex> lock(mutex);
                bucket_iterator const found_entry=find_entry_for(key);
                if(found_entry==data.end())
                {
                    data.push_back(bucket_value(key,value));//若push_back抛出异常不会影响原来的值
                }
                else
                {
                    found_entry->second=value;//赋值抛出异常,原始值仍然为改变
                }
            }
            void remove_mapping(Key const& key)
            {
                std::unique_lock<boost::shared_mutex> lock(mutex);
                bucket_iterator const found_entry=find_entry_for(key);
                if(found_entry!=data.end())
                {
                    data.erase(found_entry);
                }
            }
        };
        std::vector<std::unique_ptr<bucket_type> > buckets;//vector里有多个桶bucket,通过hash函数将key散列到一个具体的bucket中去
        Hash hasher;
        bucket_type& get_bucket(Key const& key) const//从vector找出key散列后对应的bucket
        {
            std::size_t const bucket_index=hasher(key)%buckets.size();
            return *buckets[bucket_index];
        }
    public:
        typedef Key key_type;
        typedef Value mapped_type;
        typedef Hash hash_type; 
        threadsafe_lookup_table(//构造函数
            unsigned num_buckets=19, Hash const& hasher_=Hash()):
            buckets(num_buckets),hasher(hasher_)
        {
            for(unsigned i=0;i<num_buckets;++i)
            {
                buckets[i].reset(new bucket_type);
            }
        }
        threadsafe_lookup_table(threadsafe_lookup_table const& other)=delete;//为了简化代码禁止拷贝和赋值
        threadsafe_lookup_table& operator=(
            threadsafe_lookup_table const& other)=delete;
        Value value_for(Key const& key,Value const& default_value=Value()) const//value_for是lock free的,可以多个线程并发调用,不修改任何数据故异常安全
        {
            return get_bucket(key).value_for(key,default_value);
        } 
        void add_or_update_mapping(Key const& key,Value const& value)//异常安全的,因为add_or_update_mapping是异常安全的
        {
            get_bucket(key).add_or_update_mapping(key,value);
        }
        void remove_mapping(Key const& key)
        {
            get_bucket(key).remove_mapping(key);//erase是异常安全的
        }
    };
    

    这对链表实现一个线程安全的版本,链表中每个元素都持有一个mutex,从而对链表的每一个操作至多持有当前节点和下一节点的mutex,这样锁的粒度更细了提高了并发的性能:

    #include <memory>
    #include <mutex>
    template<typename T>
    class threadsafe_list
    {
        struct node//每个节点持有一个mutex
        {
            std::mutex m;
            std::shared_ptr<T> data;
            std::unique_ptr<node> next;
     
            node():
                next()
            {}
            node(T const& value):
                data(std::make_shared<T>(value))
            {}
        };
        node head;
    public:
        threadsafe_list()
        {} 
        ~threadsafe_list()
        {
            remove_if([](T const&){return true;});
        }
        threadsafe_list(threadsafe_list const& other)=delete;
        threadsafe_list& operator=(threadsafe_list const& other)=delete;
        void push_front(T const& value)//从头部插入一个节点只需要锁住head
        {
            std::unique_ptr<node> new_node(new node(value));//在临界区外new,这样既可以减小临界区又可以避免临界区中抛出异常
            std::lock_guard<std::mutex> lk(head.m);
            new_node->next=std::move(head.next);//unique_ptr不能直接赋值,但可以通过reset或move
            head.next=std::move(new_node);
        }
     
        template<typename Function>
        void for_each(Function f)//针对链表中每个元素执行f
        {
            node* current=&head;
            std::unique_lock<std::mutex> lk(head.m);
            while(node* const next=current->next.get())
            {
                std::unique_lock<std::mutex> next_lk(next->m);//锁住当前节点后,立即释放上一个节点
                lk.unlock();//
                f(*next->data);
                current=next;
                lk=std::move(next_lk);//向后移动,unique_lock is moveable not copyable,而lock_guard不具备移动语义,可见unique_lock比lock_guard灵活
            }
        }
        template<typename Predicate>
        std::shared_ptr<T> find_first_if(Predicate p)//找到链表中事谓词P返回true的第一个元素
        {
            node* current=&head;
            std::unique_lock<std::mutex> lk(head.m);
            while(node* const next=current->next.get())
            {
                std::unique_lock<std::mutex> next_lk(next->m);
                lk.unlock();//拿到当前元素的锁后立即释放上一个锁
                if(p(*next->data))//谓词P返回true,那么返回该元素
                {
                    return next->data;
                }
                current=next;
                lk=std::move(next_lk);
            }
            return std::shared_ptr<T>();
        } 
        template<typename Predicate>
        void remove_if(Predicate p)//删除哪些使得谓词P返回true的元素
        {
            node* current=&head;
            std::unique_lock<std::mutex> lk(head.m);
            while(node* const next=current->next.get())
            {
                std::unique_lock<std::mutex> next_lk(next->m);
                if(p(*next->data))
                {
                    std::unique_ptr<node> old_next=std::move(current->next);
                    current->next=std::move(next->next);//重置连接
                    next_lk.unlock();//注意这里并没有对lk解锁或者重置
                }
                else
                {
                    lk.unlock();
                    current=next;
                    lk=std::move(next_lk);
                }
            }
        }
    };
    
    

    案例五

    C语言高级篇 - 4.链表&状态机与多线程

    //构建一个简单的单链表
    //目标:构建一个链表,然后将一些数据(譬如1,2,3三个数字)存储在链表中
    #include <stdio.h>
    #include <strings.h>
    #include <stdlib.h>
    // 构建一个链表的节点
    struct node
    {
        int data;              // 有效数据
        struct node *pNext;       // 指向下一个节点的指针
    };
    int main(void)
    {
        // 定义头指针
        struct node *pHeader = NULL;
    /********************************************************************/
        // 每创建一个新的节点,把这个新的节点和它前一个节点关联起来
        // 创建一个链表节点
        struct node *p = (struct node *)malloc(sizeof(struct node));
        if (NULL == p)
        {
            printf("malloc error.\n");
            return -1;
        }
        // 清理申请到的堆内存
        bzero(p, sizeof(struct node));
        // 填充节点
        p->data = 1;
        p->pNext = NULL;           // 将来要指向下一个节点的首地址
                                    // 实际操作时将下一个节点malloc返回的指针赋值给这个                             
        pHeader = p;  // 将本节点和它前面的头指针关联起来                       /********************************************************************/
    /********************************************************************/
        // 每创建一个新的节点,把这个新的节点和它前一个节点关联起来
        // 创建一个链表节点
        struct node *p1 = (struct node *)malloc(sizeof(struct node));
        if (NULL == p1)
        {
            printf("malloc error.\n");
            return -1;
        }
        // 清理申请到的堆内存
        bzero(p1, sizeof(struct node));
        // 填充节点
        p1->data = 2;
        p1->pNext = NULL;          // 将来要指向下一个节点的首地址
                                    // 实际操作时将下一个节点malloc返回的指针赋值给这个                            
        p->pNext = p1; // 将本节点和它前面的头指针关联起来                   /********************************************************************/     
    /********************************************************************/
        // 每创建一个新的节点,把这个新的节点和它前一个节点关联起来
        // 创建一个链表节点
        struct node *p2 = (struct node *)malloc(sizeof(struct node));
        if (NULL == p2)
        {
            printf("malloc error.\n");
            return -1;
        }
        // 清理申请到的堆内存
        bzero(p2, sizeof(struct node));
        // 填充节点
        p2->data = 3;
        p1->pNext = p2;            // 将来要指向下一个节点的首地址
                                    // 实际操作时将下一个节点malloc返回的指针赋值给这个            
        /********************************************************************/
        // 至此创建了一个有1个头指针+3个完整节点的链表。
        // 下面是4.9.3节的代码
        // 访问链表中的各个节点的有效数据,这个访问必须注意不能使用p、p1、p2,而只能
        // 使用pHeader。
        // 访问链表第1个节点的有效数据
        printf("node1 data: %d.\n", pHeader->data);  
        printf("p->data: %d.\n", p->data);          // pHeader->data等同于p->data
        // 访问链表第2个节点的有效数据
        printf("node2 data: %d.\n", pHeader->pNext->data);
        printf("p1->data: %d.\n", p1->data);   
        // pHeader->pNext->data等同于p1->data
        // 访问链表第3个节点的有效数据
        printf("node3 data: %d.\n", pHeader->pNext->pNext->data);  
        printf("p2->data: %d.\n", p2->data);           
        // pHeader->pNext->pNext->data等同于p2->data
        return 0;
    }
    
    //将创建节点的代码封装成一个函数
    //封装时的关键点就是函数的接口(函数参数和返回值)的设计
    // 作用:创建一个链表节点
    // 返回值:指针,指针指向我们本函数新创建的一个节点的首地址
    struct node * create_node(int data)
    {
        struct node *p = (struct node *)malloc(sizeof(struct node));
        if (NULL == p)
        {
            printf("malloc error.\n");
            return NULL;
        }
        // 清理申请到的堆内存
        bzero(p, sizeof(struct node));
        // 填充节点
        p->data = data;
        p->pNext = NULL;  
         
        return p;
    }
    
    //从链表头部插入新节点
    void insert_head(struct node *pH, struct node *new)
    {
        // 第1步: 新节点的next指向原来的第一个节点
        new->pNext = pH->pNext;
        // 第2步: 头节点的next指向新节点的地址
        pH->pNext = new;
        // 第3步: 头节点中的计数要加1
        pH->data += 1;
    }
    
    //从链表尾部插入新节点
    //尾部插入简单点,因为前面已经建立好的链表不用动。直接动最后一个就可以了。
    //思路:由头指针向后遍历,直到走到原来的最后一个节点。原来最后一个节点里面的pNext是NULL,现在我们只要将它改成new就可以了。添加了之后新节点就变成了最后一个。
    // 计算添加了新的节点后总共有多少个节点,然后把这个数写进头节点中。
    void insert_tail(struct node *pH, struct node *new)
    {
        int cnt = 0;
        // 分两步来完成插入
        // 第一步,先找到链表中最后一个节点
        struct node *p = pH;
        while (NULL != p->pNext)
        {
            p = p->pNext;              // 往后走一个节点
            cnt++;
        }
        // 第二步,将新节点插入到最后一个节点尾部
        p->pNext = new;
        pH->data = cnt + 1;
    }
    
    //删除节点
    // 从链表pH中删除节点,待删除的节点的特征是数据区等于data
    // 返回值:当找到并且成功删除了节点则返回0,当未找到节点时返回-1
    int delete_node(struct node*pH, int data)
    {
        // 找到这个待删除的节点,通过遍历链表来查找
        struct node *p = pH;            // 用来指向当前节点
        struct node *pPrev = NULL;      // 用来指向当前节点的前一个节点
        while (NULL != p->pNext)      // 是不是最后一个节点
        {
            pPrev = p;                    // 在p走向下一个节点前先将其保存
            p = p->pNext;              // 走到下一个节点,也就是循环增量
            // 判断这个节点是不是我们要找的那个节点
            if (p->data == data)
            {
                // 找到了节点,处理这个节点
                // 分为2种情况,一个是找到的是普通节点,另一个是找到的是尾节点
                // 删除节点的困难点在于:通过链表的遍历依次访问各个节点,找到这个节点
                // 后p指向了这个节点,但是要删除这个节点关键要操作前一个节点,但是这
                // 时候已经没有指针指向前一个节点了,所以没法操作。解决方案就是增加
                // 一个指针指向当前节点的前一个节点
                if (NULL == p->pNext)
                {
                    // 尾节点
                    pPrev->pNext = NULL;       // 原来尾节点的前一个节点变成新尾节点
                    free(p);                    // 释放原来的尾节点的内存
                }
                else
                {
                    // 普通节点
                    pPrev->pNext = p->pNext;    // 要删除的节点的前一个节点和它的后一个节点相连,这样就把要删除的节点给摘出来了
                    free(p);
                }
                // 处理完成之后退出程序
                return 0;
            }
        }
        // 到这里还没找到,说明链表中没有我们想要的节点
        printf("没找到这个节点.\n");
        return -1;
    }
    
    //单链表逆序
    // 将pH指向的链表逆序
    void reverse_linkedlist(struct node *pH)
    {
        struct node *p = pH->pNext;      // pH指向头节点,p指向第1个有效节点
        struct node *pBack;               // 保存当前节点的后一个节点地址 
        // 当链表没有有效节点或者只有一个有效节点时,逆序不用做任何操作
        if ((NULL ==p) || (NULL == p->pNext))
            return;
        // 当链表有2个及2个以上节点时才需要真正进行逆序操作
        while (NULL != p->pNext)      // 是不是最后一个节点
        {
            // 原链表中第一个有效节点将是逆序后新链表的尾节点,尾节点的pNext指向NULL
            pBack = p->pNext;          // 保存p节点后面一个节点地址
            if (p == pH->pNext)
            {
                // 原链表第一个有效节点
                p->pNext = NULL;
            }
            else
            {
                // 原链表的非第1个有效节点
                p->pNext = pH->pNext;
            }
            pH->pNext = p;
             
            //p = p->pNext;        // 这样已经不行了,因为p->pNext已经被改过了
            p = pBack;            // 走到下一个节点
        }
        // 循环结束后,最后一个节点仍然缺失
        insert_head(pH, p);
    }
    
    //双链表的封装和编程实现
    #include <stdio.h>
    #include <stdlib.h>
    // 双链表的节点
    struct node 
    {
        int data;                  // 有效数据
        struct node *pPrev;           // 前向指针,指向前一个节点
        struct node *pNext;           // 后向指针,指向后一个节点
    };
    struct node *create_node(int data)
    {
        struct node *p = (struct node *)malloc(sizeof(struct node));
        if (NULL == p)
        {
            printf("malloc error.\n");
            return NULL;
        }
        p->data = data;
        p->pPrev = NULL;
        p->pNext = NULL;       // 默认创建的节点前向后向指针都指向NULL
        return p;
    }
    int main(void)
    {
        struct node *pHeader = create_node(0);      // 头指针
        return 0;
    }
    
    //双链表的算法之插入节点
    //尾部插入
    // 将新节点new插入到链表pH的尾部
    void insert_tail(struct node *pH, struct node *new)
    {
        // 第一步先走到链表的尾节点
        struct node *p = pH;
        while (NULL != p->pNext)
        {
            p = p->pNext;          // 第一次循环走过了头节点
        }
        // 循环结束后p就指向了原来的最后一个节点
        // 第二步:将新节点插入到原来的尾节点的后面
        p->pNext = new;                // 后向指针关联好了。新节点的地址和前节点的next
        new->pPrev = p;                // 前向指针关联好了。新节点的prev和前节点的地址
                                    // 前节点的prev和新节点的next指针未变动
    }
    //头部插入
    // 将新节点new前插入链表pH中。
    // 算法参照图示进行连接,一共有4个指针需要赋值。注意的是顺序。
    void insert_head(struct node *pH, struct node *new)
    {
        // 新节点的next指针指向原来的第1个有效节点的地址
        new->pNext = pH->pNext;
        // 原来第1个有效节点的prev指针指向新节点的地址
        if (NULL != pH->pNext)
            pH->pNext->pPrev = new;
        // 头节点的next指针指向新节点地址
        pH->pNext = new;
        // 新节点的prev指针指向头节点的地址
        new->pPrev = pH;
    }
    
    //双链表遍历节点
    // 后向遍历一个双链表
    void bianli(struct node *pH)
    {
        struct node *p = pH;
         
        while (NULL != p->pNext)
        {
            p = p->pNext;
             
            printf("data = %d.\n", p->data);
        }
    }
     
    // 前向遍历一个双遍历,参数pTail要指向链表末尾
    void qianxiang_bianli(struct node *pTail)
    {
        struct node *p = pTail; 
        while (NULL != p->pPrev)
        {
            printf("data = %d.\n", p->data);  
            p = p->pPrev;
        }
    }
    
    //双链表删除节点
    // 从链表pH中删除一个节点,节点中的数据是data
    int delete_node(struct node *pH, int data)
    {
        struct node *p = pH;
        if (NULL == p)
        {
            return -1;
        }     
        while (NULL != p->pNext)
        {
            p = p->pNext;
            // 在这里先判断当前节点是不是我们要删除的那个节点
            if (p->data == data)
            {
                // 找到了,删除之。当前上下文是:当前节点为p
                if (NULL == p->pNext)
                {
                    // 尾节点
    // p表示当前节点地址,p->pNext表示后一个节点地址,p->pPrev表示前一个节点的地址
                    p->pPrev->pNext = NULL;
                    //p->pPrev = NULL;         可以省略,因为后面整个都被销毁了
                    // 销毁p节点
                    //free(p);
                }
                else
                {
                    // 不是尾节点,普通节点
                    // 前一个节点的next指针指向后一个节点的首地址
                    p->pPrev->pNext = p->pNext;
                    // 当前节点的prev和next指针都不用管,因为后面会整体销毁整个节点
                    // 后一个节点的prev指针指向前一个节点的首地址
                    p->pNext->pPrev = p->pPrev;
                    //free(p);
                }
                free(p);
                return 0;
            }
        }
        printf("未找到目标节点.\n");
        return -1;
    }
    

    案例六

    深入解析条件变量(condition variables)
    条件变量是线程的另外一种同步机制,这些同步对象为线程提供了会合的场所,理解起来就是两个(或者多个)线程需要碰头(或者说进行交互-一个线程给另外的一个或者多个线程发送消息),我们指定在条件变量这个地方发生,一个线程用于修改这个变量使其满足其它线程继续往下执行的条件,其它线程则接收条件已经发生改变的信号。

    条件变量同锁一起使用使得线程可以以一种无竞争的方式等待任意条件的发生。所谓无竞争就是,条件改变这个信号会发送到所有等待这个信号的线程。而不是说一个线程接受到这个消息而其它线程就接收不到了。

    #include <pthread.h>
    struct msg {
    struct msg *m_next;
    /* ... more stuff here ... */
    };
    struct msg *workq;
    pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
    pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
    void process_msg(void)
    {
        struct msg *mp;
        for (;;) {
        pthread_mutex_lock(&qlock);
        while (workq == NULL)
            pthread_cond_wait(&qready, &qlock);
        mp = workq;
        workq = mp->m_next;
        pthread_mutex_unlock(&qlock);
        /* now process the message mp */
        }
    }
    void enqueue_msg(struct msg *mp)
    {
        pthread_mutex_lock(&qlock);
        mp->m_next = workq;
        workq = mp;
        pthread_mutex_unlock(&qlock);
        pthread_cond_signal(&qready);
    }
    
    展开全文
  • 定义:Future模式类似商品订单,客户下单后,卖家...案例:JDK内部实现Future模式,FutureTask类 代码实现: 主业务(耗时较长): @Service public class FutureService{ public String dealRealData(String key...

    定义:Future模式类似商品订单,客户下单后,卖家在准备商品和物流配送过程需要很多时间,这段时间内客户可以先做其他事,收到订单后,再处理订单;

    案例:JDK内部实现的Future模式,FutureTask类

    代码实现:

    主业务(耗时较长):

    @Service
    public class FutureService{
        public String dealRealData(String key,int[] data) throws InterruptedException {
            //耗时较长
            Thread.sleep(1000);
            System.out.println(key+":正在处理真实的业务逻辑");
            int a=0;
            for(int i=0,len=data.length;i<len;i++){
                a=a+data[i];
            }
            return a+"";
        }
    }
    
    

    辅助业务,与主业务数据无关联

    @Service
    public class FutureClicent {
        public void deal(String key){
            //再真实业务处理时,可以在等待主要业务进行的同时,先干点其他的事
            System.out.println(key+":正在处理辅助辅助业务");
        }
    }
    
    

    主业务代理工具

    public class FutureUtil implements Callable {
        private FutureService futureService;
        private String key;
        private int[] data;
        public FutureUtil(FutureService futureService,String key,int[] data){
            this.futureService=futureService;
            this.key=key;
            this.data=data;
        }
        @Override
        public Object call() throws Exception {
            //处理真是业务逻辑
            return futureService.dealRealData(key,data);
        }
    }
    

    客户端调用:

    @RestController
    @RequestMapping(value = "/chat/app/test")
    public class TestAppController {
        private FutureService futureService;
        private FutureClicent futureClicent;
        public TestAppController(FutureService futureService, FutureClicent futureClicent){
            this.futureService=futureService;
            this.futureClicent=futureClicent;
        }
         @PostMapping(value="test")
         public void Test(){
            deal("A",new int[]{1,2,3});
            deal("B",new int[]{7,8,2,3});
         }
    
         private void deal(String key,int[] b){
            new Thread(){
                @Override
                public void run() {
                    FutureTask<String> futureTask=new FutureTask<String>(new FutureUtil(futureService,key,b));
                    ExecutorService service= Executors.newFixedThreadPool(1);
                    service.submit(futureTask);
                    futureClicent.deal(key);
                    try {
                        System.out.println(key+":处理完结果,返回主要业务逻辑结果:"+futureTask.get());
                        System.out.println(key+":结束");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            }.start();
         }
    }
    
    

    实现结果:

    A:正在处理辅助辅助业务
    B:正在处理辅助辅助业务
    A:正在处理真实的业务逻辑
    A:处理完结果,返回主要业务逻辑结果:6
    A:结束
    B:正在处理真实的业务逻辑
    B:处理完结果,返回主要业务逻辑结果:20
    B:结束
    
    展开全文
  • IBM 在互联网医疗领域定位:怎么样利用数据挖掘和人工智能技术,从海量医疗... 计算机处理能力飞跃发展:Hadoop、Spark这样的并行计算,还是像GPU、FPGA这样硬件加速发展,计算机处理能力有了性能上飞...
  • 一 概念说明 1 模型 并行复制是典型生产者、消费者模式,Coordinator作为生产者,worker线程作为消费者。 2 Waiting for preceding transaction to commit 当前事务无法和正在回放事务并发回放出现等待 二 ...
  • 文章目录什么是设计模式架构模式设计模式代码模式(成例 Idiom)单例模式普通单例假如单例中有某个字段改进的单例代理模式再升级不变模式不变模式是如何实现的不变模式的案例Future模式核心思想是异步调用举个栗子JDK...
  • CameraLink标准是在ChannelLink标准基础上多加了6对差分信号线,其中4对用于并行传输相机控制信号,另外2对用于相机和图像采集卡之间串行通信(本质就是UART两根线)。 CameraLink标准视频传输模式分为三种:...
  • Master-Worker 模式是常用的并行计算模式。它核心思想是系统由两类进程协同工作,Master和Worker进程。Master负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完毕后,会将结果返回给Master,由...
  • R与并行计算(转)

    2016-09-09 09:33:00
    之后作者从R用户的使用角度讨论了隐式和显示两种并行计算模式,并给出了相应的案例。隐式并行计算模式不仅提供了简单清晰的使用方法,而且很好的隐藏了并行计算的实现细节。因此用户可以专注于问题本身。显示并行...
  • Hadoop-Mapreduce 1. MapReduce 介绍 ...Map负责“分”,即把复杂任务分解为若干个“简单任务”来并行处理。可以进行拆分前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。 Reduce负责“合”,即...
  • Local模式就是运行在一台计算机 上的模式,通常就是用于在本机.上练手和测试。 它可以通过以下集中方式设置Master。 local:所有计算都运行在一- 个线程当中,没有任何并行计算,通常我们在本机执行 一-些测试代码,...
  • 背景继前面文章《MySQLCOMMIT_ORDER模式下组提交分组实现与BUG案例源码剖析》分析后(以下补充分析流程图,具体分析过程,请转回原文阅读),原本以为对COMMIT_ORDER模式组提交已经理解得很透彻,但是没过几...
  • 为降低深度学习程序开发难度,提出了一种基于Julia云平台交互式深度学习模式。设计了一套深度学习原语,用Julia实现原语,为Julia程序员提供调用接口,构建交互分析系统;并提供GPU/CPU实现接口,由云端系统根据用户...
  • 很久没写爬虫了,一个经典Python爬虫例子,Python多线程爬虫例子案例,目标网站结构比较简单,适合练手使用,采用了经典生产者和消费者模式,同时结合python类和装饰器使用,应该能够让你获益不少。...
  • 2. 集群模式Spark Streaming难点解决案例分享 怎么找出前课想象原因? 每个batch duration内处理中,saveAsTextFiles只有一个job。reduceByKey的并行度改为8,shuffle时也确实有8个task。 虽然设置job并行...
  • 数字资产区块链入行的大佬越来越多,在资金、技术型人才的大量投入下,数字资产区块链行业得到了迅猛的发展,而作为造就...对此,自然是有成功案例的,当然失败的案例占多数,正所谓风险与利益并行,投资者自还是络绎不
  • 导读: intel辐射度优化方案,非常值得借鉴。不过对于基于硬件辐射度算法而言,使用这个模式还需要考虑CPU和GPU的并行优化问题。本文转自 http://softwarecommunity-zho.intel.com/articles/zho/31066.htm
  • 2. 集群模式Spark Streaming难点解决案例分享 怎么找出前课想象原因? 每个batch duration内处理中,saveAsTextFiles只有一个job。reduceByKey的并行度改为8,shuffle时也确实有8个task。 虽然设置job并行...
  • MATLAB神经网络43个案例分析源代码&数据 《MATLAB 神经网络43个案例分析》目录 ...第42章 并行运算与神经网络——基于CPU/GPU的并行神经网络运算 第43章 神经网络高效编程技巧——基于MATLAB R2012b新版本特性探讨
  • 《MATLAB 神经网络43个案例分析》目录 第1章 BP神经网络数据分类——语音特征...第42章 并行运算与神经网络——基于CPU/GPU的并行神经网络运算 第43章 神经网络高效编程技巧——基于MATLAB R2012b新版本特性探讨
  • 铁通天津城域网络在互联时采用是思科公司独有千兆以太网通道技术,它是链路带宽扩容一条重要途径,用于将多条并行链路带宽叠加起来。这项 技术能够把2-4组千兆端口之间连接带宽聚合在一起,在全双工工作...

空空如也

空空如也

1 2 3 4 5 6
收藏数 105
精华内容 42
关键字:

并行模式的案例