精华内容
下载资源
问答
  • c++中多线程编程是不是线程数越多越好

    千次阅读 多人点赞 2013-10-05 12:41:26
    多线程编程可以提高程序的并发执行能力,那是不是线程越多越好呢?

    多线程编程可以提高程序的并发执行能力,那是不是线程越多越好呢?

    大家可以参考下面的代码做测试:

    #define  MAX_WORKTHREAD		5
    map<int, int> m_task;//用于统计每个线程做的任务数
    std::deque<int> m_MsgQueue;//处理的消息队列
    	for (i=0; i<MAX_WORKTHREAD; i++)
    	{
    		int* nTemp = new int;
    		*nTemp = i;
    		hWTHandle[i] = CreateThread(NULL, 0, WorkThread, nTemp, 0, &g_dwWorkThread[i]); 
    		WorkThreadCout ++;
    		m_task[i] = 0;
    		Sleep(100);
    	}
    	
    DWORD WINAPI WorkThread(LPVOID lpParam)  
    {   
    	// cout << "No." << g_dwThreadID << " thread is running." << endl;  
    	while (TRUE)  
    	{  
    		int* nRemp = (int*)lpParam;
    
    		int n = -1;
    
    		EnterCriticalSection(&g_cs);
    		//cout << "No " << *nRemp << ", " << g_dwWorkThread[*nRemp] << " thread is running." << endl;
    		LeaveCriticalSection(&g_cs);
    
    		EnterCriticalSection(&g_MsgQueue);
    		if (!m_MsgQueue.empty())
    		{
    			n = m_MsgQueue.front();
    			m_MsgQueue.pop_front();
    		
    			m_task[*nRemp]++;
    		}
    		LeaveCriticalSection(&g_MsgQueue);
    
    		EnterCriticalSection(&g_cs);
    		cout << "No:" << *nRemp << ", " << n << endl;
    		LeaveCriticalSection(&g_cs);
    
    		Sleep(2000);//备注1
    
    	}  
    
    	cout << "No " << lpParam << " end" << endl;
    
    	return 0;  
    }

    当任务执行完了后,可以打印m_task里的数据:

    std::map<int, int>::iterator IterCount;
    for(IterCount=m_task.begin(); IterCount!=m_task.end();IterCount++)
    {
    	int nThreadId= (*IterCount).first;
    	int nCount = (*IterCount).second;
    					
    	EnterCriticalSection(&g_cs);
    	cout << "nThreadId:" << nThreadId << ", nCount" << nCount<<endl;
    	LeaveCriticalSection(&g_cs);
    
    	(*IterCount).second = 0;
    }


    可以修改备注1处的Sleep(2000),分别改为1000,3000,5000,10000等看看结果有什么区别?这里的时间其实是模拟执行一个任务所需要的时间,也可以用随机函数随机产生。

    另外,多线程操作全局变量的时候,一定要用线程同步(如临界区等)来操作,否则,会有问题。在使用的过程中,一定要注意资源和线程之间的关系,避免死锁发生。

    经过测试,多线程中的线程数是不是越多越好?这个问题的答案应该就有了。


    转载请注明原创链接:http://blog.csdn.net/wujunokay/article/details/12307773







    展开全文
  • C++编程中是不是线程越多越好

    千次阅读 2014-03-09 09:20:24
    多线程编程可以提高程序的并发执行能力,那是不是线程越多越好呢? 大家可以参考下面的代码做测试: [cpp] view plaincopy #define MAX_WORKTHREAD 5  mapint, int> m_task;//...

    多线程编程可以提高程序的并发执行能力,那是不是线程越多越好呢?

    大家可以参考下面的代码做测试:

    1. #define  MAX_WORKTHREAD     5  
    2. map<intint> m_task;//用于统计每个线程做的任务数  
    3. std::deque<int> m_MsgQueue;//处理的消息队列  
    4.     for (i=0; i<MAX_WORKTHREAD; i++)  
    5.     {  
    6.         int* nTemp = new int;  
    7.         *nTemp = i;  
    8.         hWTHandle[i] = CreateThread(NULL, 0, WorkThread, nTemp, 0, &g_dwWorkThread[i]);   
    9.         WorkThreadCout ++;  
    10.         m_task[i] = 0;  
    11.         Sleep(100);  
    12.     }  
    13.       
    14. DWORD WINAPI WorkThread(LPVOID lpParam)    
    15. {     
    16.     // cout << "No." << g_dwThreadID << " thread is running." << endl;    
    17.     while (TRUE)    
    18.     {    
    19.         int* nRemp = (int*)lpParam;  
    20.   
    21.         int n = -1;  
    22.   
    23.         EnterCriticalSection(&g_cs);  
    24.         //cout << "No " << *nRemp << ", " << g_dwWorkThread[*nRemp] << " thread is running." << endl;  
    25.         LeaveCriticalSection(&g_cs);  
    26.   
    27.         EnterCriticalSection(&g_MsgQueue);  
    28.         if (!m_MsgQueue.empty())  
    29.         {  
    30.             n = m_MsgQueue.front();  
    31.             m_MsgQueue.pop_front();  
    32.           
    33.             m_task[*nRemp]++;  
    34.         }  
    35.         LeaveCriticalSection(&g_MsgQueue);  
    36.   
    37.         EnterCriticalSection(&g_cs);  
    38.         cout << "No:" << *nRemp << ", " << n << endl;  
    39.         LeaveCriticalSection(&g_cs);  
    40.   
    41.         Sleep(2000);//备注1  
    42.   
    43.     }    
    44.   
    45.     cout << "No " << lpParam << " end" << endl;  
    46.   
    47.     return 0;    
    48. }  

    当任务执行完了后,可以打印m_task里的数据:

    1. std::map<intint>::iterator IterCount;  
    2. for(IterCount=m_task.begin(); IterCount!=m_task.end();IterCount++)  
    3. {  
    4.     int nThreadId= (*IterCount).first;  
    5.     int nCount = (*IterCount).second;  
    6.                       
    7.     EnterCriticalSection(&g_cs);  
    8.     cout << "nThreadId:" << nThreadId << ", nCount" << nCount<<endl;  
    9.     LeaveCriticalSection(&g_cs);  
    10.   
    11.     (*IterCount).second = 0;  
    12. }  


    可以修改备注1处的Sleep(2000),分别改为1000,3000,5000,10000等看看结果有什么区别?这里的时间其实是模拟执行一个任务所需要的时间,也可以用随机函数随机产生。

    另外,多线程操作全局变量的时候,一定要用线程同步(如临界区等)来操作,否则,会有问题。在使用的过程中,一定要注意资源和线程之间的关系,避免死锁发生。

    经过测试,多线程中的线程数是不是越多越好?这个问题的答案应该就有了。






    原创链接:http://blog.csdn.net/wujunokay/article/details/12307773

    展开全文
  • Java应用中线程是不是开的越多越好,开多少合适,如何减少上下文切换开销?,如何写个shell脚本获取上下文切换的开销?
  • Java多线程详解

    2019-08-26 23:20:52
    多线程 什么是进程? 正在执行的程序 什么是线程? 进程的子单位,一个能够完成独立功能的执行路径 为什么需要开启多线程 当执行某些耗时操作的任务...开启多线程是不是越多越好,提高了效率还是降低了效率? 不是...

    多线程


    什么是进程?

    正在执行的程序

    什么是线程?

    进程的子单位,一个能够完成独立功能的执行路径

    为什么需要开启多线程

    • 当执行某些耗时操作的任务的时候需要开启多线程,防止线程阻塞
    • 能够让两个任务看起来像在同时执行
    • 提高CPU的使用率,进而提高进程和内存的使用率

    为什么开启多线程会同时执行

    因为CPU切换执行的速度太快了,肉眼无法察觉

    开启多线程是不是越多越好,提高了效率还是降低了效率?

    不是,线程越多,效率越慢,但是太少,浪费CPU资源,所以,合理利用CPU

    并发和并行的区别

    • 并发:在同一时间段下同时执行多个线程,看起来像同时执行
    • 并行:在同一时间刻度下(不能够在分割的时间单位)执行多个线程,本质上就是同时执行

    CPU在某个最小时间刻度单位下,执行的是一个进程的一个线程的一个不可分割原子性语句

    举例:a++是线程安全的吗?不是

    Java虚拟机的启动至少开启了两条线程,主线程和垃圾回收线程

    一个线程可理解为进程的子任务

    开启线程

    线程的启动方式本质有两种:

    1. 继承Tread类的方式
    2. 实现Runnable的方式

    方式一:继承Thread类

    1. 自定义类MyThread继承Thread
    2. MyThread类里面重写run()方法
    3. 创建线程对象
    4. 启动线程

    注意:

    1. 启动线程使用的是start()方法而不是run()方法
    2. 线程不能多次启动

    方式二:实现Runnable接口

    1. 自定义类MyRunnable实现Runnable接口
    2. 重写run()方法
    3. 创建MyRunnable类的对象
    4. 创建Thread类的对象,并把步骤3创建的对象作为构造参数传递启动线程

    实现接口方式的好处

    1. 可以避免由于Java单继承带来的局限性
    2. 适合多个相同程序的代码去处理同一个资源的情况,把线程同程序的代码,较好的体现了面向对象的设计思想

    方式三:实现Callable方式开启线程

    继承Thread和实现Runnable的方式的特点:

    1. 没有返回结果
    2. 没有异常

    CallableRunnable的区别:

    1. Runnable无返回值,没有异常抛出
    2. Callable可以在启动线程中获取返回值,以及接受子线程的异常

    线程间的数据传递:线程通信

    A线程中开启了B线程

    A——>B 通过构造方法

    B——>A 通过Callable方式

    public class ThreadDemo03 {
    	public static void main(String[] args) {
    		FutureTask<Integer> task = new FutureTask<>(new CalculateCallable(1,100));
    		Thread t = new Thread(task);
    		t.start();
    		
    		try {
    			Integer i = task.get();
    			System.out.println("计算结果: " + i);
    		} catch (InterruptedException | ExecutionException e) {
    			e.printStackTrace();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		
    		System.out.println("over");
    	}
    }
    
    class CalculateCallable implements Callable<Integer> {
    	
    	private int m;
    	private int n;
    	
    	public CalculateCallable(int m, int n) {
    		super();
    		this.m = m;
    		this.n = n;
    	}
    	
    	public CalculateCallable() {
    		super();
    	}
    
    	@Override
    	public Integer call() throws Exception {
    		int sum = 0;
    		for (int i = m; i <= n; i++) {
    			System.out.println("子线程: " + i);
    			sum += i;
    			throw new NullPointerException("空空异常!!!");
    		}
    		return sum;
    	}
    
    	public int getM() {
    		return m;
    	}
    
    	public void setM(int m) {
    		this.m = m;
    	}
    
    	public int getN() {
    		return n;
    	}
    
    	public void setN(int n) {
    		this.n = n;
    	}
    	
    } 
    

    匿名内部类的方式开启线程

    注意:当继承Thread和实现Runnable方式同时实现,继承Thread优先

    public class ThreadDemo04 {
    	public static void main(String[] args) {
    		new Thread(); // 匿名线程对象
    		
    		// 方式一继承Thread方式开启线程
    		new Thread() {
    			public void run() {
    				for (int i = 0; i < 100; i++) {
    					System.out.println("A.继承Thread方式:" + i);
    				}
    			}
    		}.start();
    		
    		// 方式二实现Runnable接口的方式开启线程
    		new Thread(new Runnable() {
    			
    			@Override
    			public void run() {
    				for (int i = 0; i < 100; i++) {
    					System.out.println("B.实现Runnable接口的方式:" + i);
    				}
    			}
    		}).start();
    		
    		
    		for (int i = 0; i < 100; i++) {
    			System.out.println("C.主线程:" + i);
    		}
    		
    		new Thread(new Runnable() {
    			
    			@Override
    			public void run() {
    				for (int i = 0; i < 100; i++) {
    					System.out.println("D.实现Runnable接口的方式:" + i);
    				}
    			}
    		}) {
    			public void run() {
    				for (int i = 0; i < 100; i++) {
    					System.out.println("E.继承Thread方式:" + i);
    				}
    			}
    		}.start();
    	}
    }
    
    

    Lambda表达式开启线程

    什么是Lambda表达式 :一种函数式接口的新的写法,本质还是匿名内部类,但是这个父类是函数式接口

    什么是函数式接口

    只有一个抽象方法的接口称为函数式接口

    Runnable,FileFilter

    Lambda表达式的语法:

    ​ ()—> {}

    ​ () 小括号里面是参数列表,如果一个函数式接口中的抽象方法没有参数,这里可以不写参数

    ​ —>固定格式

    ​ {}重写函数式接口的抽象方法的方法体

    ​ 如果方法体中只有一条语句,{}可以省略不写,如果返回值只有一条语句,return关键字可以省略

    public class ThreadDemo05 {
    	public static void main(String[] args) {
    		new Demo().method(new ITest() {
    			
    			@Override
    			public void show() {
    				System.out.println("show");
    			}
    		});
    		
    		new Demo().method(()->System.out.println("Lambda表达式的 show"));
    		
    		new Demo().method((a, b)->System.out.println(a + "|" + b));
    		
    		new Demo().method((a, b)->a + b);
    		
    		// com.sxt.threaddemo
    		File f = new File("src/com/sxt/threaddemo");
    		/*File[] files = f.listFiles(new FileFilter() {
    			
    			@Override
    			public boolean accept(File f) {
    				return f.isFile() && f.getName().endsWith(".java");
    			}
    		});*/
    		
    		/*File[] files = f.listFiles((file) -> file.isFile() && file.getName().endsWith(".java"));
    		
    		for (File file : files) {
    			System.out.println(file);
    		}*/
    		ArrayList<String> list = new ArrayList<>();
    		list.add("张三丰");
    		list.add("李四");
    		list.add("赵六");
    		list.add("王五哈哈哈");
    		
    		list.forEach((t)->{
    			System.out.println(t);
    		});
    		
    		list.forEach(new Consumer<String>() {
    
    			@Override
    			public void accept(String t) {
    				System.out.println(t);
    			}
    		});
    		
    		list.removeIf((t)-> t.length() == 2);
    		System.out.println(list);
    		
    		/*
    		 * new Consumer<String>() {
    
    			@Override
    			public void accept(String t) {
    				System.out.println(t);
    			}
    		}
    		
    		Consumer<? super E> action = new Consumer<String>() {
    
    			@Override
    			public void accept(String t) {
    				System.out.println(t);
    			}
    		}
    		
    		public void forEach(Consumer<? super E> action) {
    	        Objects.requireNonNull(action);
    	        final int expectedModCount = modCount;
    	        @SuppressWarnings("unchecked")
    	        final E[] elementData = (E[]) this.elementData;
    	        final int size = this.size;
    	        for (int i=0; modCount == expectedModCount && i < size; i++) {
    	            action.accept(elementData[i]);
    	        }
    	        if (modCount != expectedModCount) {
    	            throw new ConcurrentModificationException();
    	        }
    	    }
    		 */
    		
    		new Thread(()->{
    //			System.out.println("Lambda表达式开启线程");
    			for (int i = 0; i < 10000; i++) {
    				System.out.println("Lambda:" + i);
    			}
    		}).start();
    		
    		for (int i = 0; i < 10000; i++) {
    			System.out.println("main:" + i);
    		}
    	}
    }
    
    @FunctionalInterface
    interface ITest {
    	void show();
    //	void test();
    }
    
    @FunctionalInterface
    interface IDemo {
    	void show(int a, String b);
    //	void test();
    }
    
    @FunctionalInterface
    interface IShow {
    	int add(int a, int b);
    //	void test();
    }
    class Demo {
    	/*
    	 * ITest test = new ITest() {
    			
    			@Override
    			public void show() {
    				System.out.println("show");
    			}
    		};
    	 */
    	public void method(ITest test) {
    		test.show();	
    	}
    	
    	public void method(IDemo d) {
    		d.show(10, "sss");	
    	}
    	
    	public void method(IShow s) {
    		int add = s.add(100, 200);
    		System.out.println(add);
    	}
    }
    
    

    设置和获取线程名称的几种方式

    1. 通过构造方法
    2. 通过set/get方法
    3. 通过静态方法

    主线程的名称叫做:main

    public class ThreadDemo01 {
    	public static void main(String[] args) {
    		/*MyThread t = new MyThread();
    		t.start();
    		
    		MyThread t2 = new MyThread();
    		t2.start();*/
    		
    		/*MyThread t3 = new MyThread("隔壁老王");
    		t3.start();*/
    		/*Thread t3 = new Thread("隔壁老王");
    		t3.start();*/
    		
    		/*MyThread t3 = new MyThread();
    		t3.setName("隔壁老王家");
    		t3.start();*/
    		
    		/*Thread.currentThread().setName("主线程");
    		String name = Thread.currentThread().getName();
    		System.out.println(name);*/
    		
    		Thread t4 = new Thread(new MyRunnable(), "隔壁老李");
    		t4.start();
    		
    		/*long mainId = Thread.currentThread().getId();
    		System.out.println(mainId);*/
    	}
    }
    
    class MyThread extends Thread {
    	/*public MyThread() {
    		super();
    	}
    	
    	public MyThread(String name) {
    		super(name);
    	}*/
    	@Override
    	public void run() {
    		for (int i = 0; i < 100; i++) {
    			System.out.println(this.getName() + ":" + i);
    		}
    	}
    }
    
    class MyRunnable implements Runnable {
    
    	@Override
    	public void run() {
    		for (int i = 0; i < 100; i++) {
    			System.out.println(Thread.currentThread().getName() + ":" + i);
    		}
    		System.out.println("子线程: " + Thread.currentThread().getId());
    	}
    	
    }
    

    线程的常用方法

    Java是如何对线程进行调度的?

    java使用的是抢占式调度模型

    抢占式调度模型

    优先让优先级高的线程使用CPU,如果线程的优先级相同,那么会随机选择一个,优先级高的线程获取CPU时间片相对多一些

    设置和获取线程的优先级

    public final int getPriority()
    public final void setPriority(int newPriority)
    

    线程休眠

    public static void sleep(long millis)
    

    可以用来制作时钟

    public class ThreadDemo03 {
    	public static void main(String[] args) {
    		Thread t = new Thread(new MyClockRunnable(), "北京时间 ");
    		t.start();
    	}
    }
    
    class MyClockRunnable implements Runnable {
    
    	@Override
    	public void run() {
    		while (true) {
    			String s = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    			System.out.println(Thread.currentThread().getName() + ":" + s);
    			
    			try {
    				Thread.sleep(1000L);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    	
    }
    

    中断线程

    public final void stop
    public void interrupt
    

    stopinterrupt的区别?

    stop方法表示结束线程的生命

    interrupt表示向线程抛出一个InterruptedException异常

    //子线程睡眠10S中,主线程在子线程睡眠到第4S的时候,敲醒子线程
    public class ThreadDemo04 {
    	public static void main(String[] args) {
    		
    		Thread t = new Thread(new StopThread());
    		t.start();
    		System.out.println("主线程: 嘘,开始数,准备好");
    		try {
    			Thread.sleep(4000);
    			// t.stop();
    			t.interrupt();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    }
    
    class StopThread implements Runnable {
    
    	@Override
    	public void run() {
    		String startTime = new SimpleDateFormat("开睡时间: yyyy-MM-dd HH:mm:ss").format(new Date());
    		System.out.println(startTime);
    		
    		try {
    			Thread.sleep(10000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    			System.out.println("谁打我!!!");
    			System.out.println("打扰了,我自己来");
    			System.exit(0);
    		}
    		
    		String endTime = new SimpleDateFormat("觉醒时间: yyyy-MM-dd HH:mm:ss").format(new Date());
    		System.out.println(endTime);
    	}
    	
    }
    
    

    后台线程

    public final void setDaemon(boolean on)
    

    一般来说,JVMJAVA虚拟机)中一般会包括两种线程

    线程分类

    1. 用户线程
    2. 后台线程、守护线程、服务线程

    所谓后台线程(daemon)线程指的是:在程序运行的时候在后台提供的一种通用的服务的线程,并且这种线程并不属于程序中不可或缺的部分。因此,当所有的非后台线程结束的时候,也就是用户线程都结束的时候,程序也就终止了。同时,会杀死进程中的所有的后台线程。反过来说,只要有任何非后台线程还在运行,程序就不会结束。比如执行main()的就是一个非后台线程。基于这个特点,当虚拟机中的用户线程全部退出运行时,守护线程没有服务的对象后,JVM也就退出了。

    线程加入

    public final void join()
    

    代码示例:

    public class ThreadDemo06 {
    	public static void main(String[] args) {
    		JoinThread t1 = new JoinThread();
    		JoinThread t2 = new JoinThread();
    		JoinThread t3 = new JoinThread();
    		
    		t1.setName("刘备");
    		t2.setName("关羽");
    		t3.setName("张飞");
    		
    		t1.start();
    		try {
    			t1.join();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		t2.start();
    		try {
    			t2.join();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		t3.start();
    		
    		
    	}
    }
    
    class JoinThread extends Thread {
    	@Override
    	public void run() {
    		for (int i = 0; i < 100; i++) {
    			System.out.println(getName() + ":" + i);
    		}
    	}
    }
    

    线程礼让

    public static void yield()
    

    让正在执行的线程让出CPU的执行权一小会,t1t2互相争夺CPU的执行权,t1抢到了CPU的执行权,让出CPU的执行权,重新回到线程队列中继续抢夺CPU的执行权

    public class ThreadDemo07 {
    	public static void main(String[] args) {
    		YieldThread t1 = new YieldThread();
    		YieldThread t2 = new YieldThread();
    		
    		t1.setName("孔融");
    		t2.setName("孔融的哥哥");
    		
    		t1.start();
    		t2.start();
    	}
    }
    
    class YieldThread extends Thread { 
    	@Override
    	public void run() {
    		for (int i = 0; i < 100; i++) {
    			System.out.println(getName() + ":" + i);
    			Thread.yield();
    		}
    	}
    }
    
    

    线程同步

    需求:深圳罗湖火车站目前正在出售车票,共有100张票,而它有3个售票窗口售票, 请设计一个程序模拟该火车站售票。

    出现的问题
    1.卖出了同票
    窗口A正在出售第97张票
    窗口B正在出售第97张票
    窗口C正在出售第97张票

    2.卖出了负票
    窗口A正在出售第1张票
    窗口C正在出售第0张票
    窗口B正在出售第-1张票

    问题产生的原因: CPU在某一个最小的时间刻度单位下,执行的是一个进程的一个线程的一个不可再分割的原子性语句

    解决办法:
    1.同步代码块
    2.同步方法
    3.同步锁

    可能出现线程安全的问题的情况:
    1.存在多线程环境
    2.多个线程共享同一份数据
    3.多个线程操作同一份数据并且共享数据做了修改
    4.存在多条语句操作共享数据
    在多线程环境下,存在多条语句操作共享数据,并且对数据做了修改的操作,那么必定会出现线程安全问题

    解决办法: 将多条操作共享数据的语句 包裹起来,同步锁

    public class ThreadSynchronizationDemo {
    	public static void main(String[] args) {
    		/*SellTicketThread t1 = new SellTicketThread();
    		SellTicketThread t2 = new SellTicketThread();
    		SellTicketThread t3 = new SellTicketThread();*/
    		SellTicketThread st = new SellTicketThread();
    		Thread t1 = new Thread(st);
    		Thread t2 = new Thread(st);
    		Thread t3 = new Thread(st);
    		
    		t1.setName("窗口A");
    		t2.setName("窗口B");
    		t3.setName("窗口C");
    		
    		t1.start();
    		t2.start();
    		t3.start();
    		
    	}
    }
    
    /*
     * 解决办法方式一:同步代码块
    	格式:
    		synchronized(对象){需要同步的代码;}
    	同步的好处
    	解决了多线程的安全问题。
    	同步的弊端
    	当线程相当多时,因为每个线程都会去判断同步上的锁,这是很耗费资源的,降低程序的运行效率。如果出现了同步嵌套,就容易产生死锁问题
    	注意: 这里的对象是锁对象,可以是任意对象,但是必须多个线程共享同一个对象
    		 这种方式加锁的时间是进入代码之后,释放锁的时间是代码块执行结束的时候
    
     */
    /*class SellTicketThread implements Runnable {
    	private int tickets = 100;
    	@Override
    	public void run() {
    		while (true) {
    			synchronized (MyLock.LOCK) {
    				if (tickets > 0) {
    					try {
    						Thread.sleep(100L);
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    					System.out.println(Thread.currentThread().getName() + "正在出售第" + (tickets--) + "张票");
    				} 
    			}
    			
    		}
    	}
    }*/
    
    enum MyLock {
    	LOCK
    }
    
    /*
     * 解决办法方式二:同步方法
    	格式:
    		public synchronized 返回值 方法名(参数列表) {
    	      		//需要同步的代码块
    	   	}
    	
    	如果锁对象是this,就可以考虑使用同步方法。
    	
    	如果方式静态方法, 当前类对应的字节码文件对象作锁 Class c = SellTicketThread.class
     */
    /*class SellTicketThread implements Runnable {
    	private static int tickets = 100;
    
    	@Override
    	public void run() {
    		while (true) {
    			sellTicket();
    		}
    	}
    	
    	public static synchronized void sellTicket() { 
    		if (tickets > 0) {
    			try {
    				Thread.sleep(100L);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			System.out.println(Thread.currentThread().getName() + "正在出售第" + (tickets--) + "张票");
    		}
    	}
    }*/
    
    /*class SellTicketThread extends Thread {
    	
    	private static int tickets = 100;
    	
    	@Override
    	public void run() {
    		while (true) {
    			sellTicket();
    		}
    	}
    	
    	public static synchronized void sellTicket() {
    		if (tickets > 0) {
    			try {
    				Thread.sleep(100L);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			System.out.println(Thread.currentThread().getName() + "正在出售第" + (tickets--) + "张票");
    		}
    	}
    }*/
    
    class SellTicketThread implements Runnable {
    	private static int tickets = 100;
    	Lock lock = new ReentrantLock();
    	@Override
    	public void run() {
    		while (true) {
    			lock.lock();
    			if (tickets > 0) {
    				try {
    					Thread.sleep(100L);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    				System.out.println(Thread.currentThread().getName() + "正在出售第" + (tickets--) + "张票");
    			}
    			lock.unlock();
    		}
    	}
    }
    

    线程死锁

    • 死锁:指两个或者两个以上的线程在执行的过程中,因争夺资源产生的一种互相等待现象。
    • 代码演示死锁现象。
    public class DeadThradDemo {
    public static void main(String[] args) {
    	DieLock dl1 = new DieLock(true);
    	DieLock dl2 = new DieLock(false);
    dl1.start();
    dl2.start();
    }
    }
    class DieLock extends Thread {
    	private boolean flag;
    public DieLock() {
    }
    
    public DieLock(boolean flag) {
    	this.flag = flag;
    }
    
    @Override
    public void run() {
    	if (flag) {
    		// dl1进来
    		synchronized (TestLock.LOCKA) {
    			System.out.println("if 语句中 LockA锁"); // 就在输出完这句话之后被dl2抢到了资源
    			synchronized (TestLock.LOCKB) {
    				System.out.println("if 语句中 LockB锁");
    			}
    		}
    	} else {
    		// dl走else
    		synchronized (TestLock.LOCKB) {
    			System.out.println("else 语句中 lockB锁");
    			synchronized (TestLock.LOCKA) {
    				System.out.println("else 语句中 lockA锁");
    			}
    		}
    	}
    }
    }
    
    class TestLock {
    	public static final Object LOCKA = new Object();
    	public static final Object LOCKB = new Object();
    }
    

    线程池和线程组

    线程池和线程组的区别?

    线程组:

    线程组存在的意义,首要原因是安全。

    java默认创建的线程都是属于系统线程组,而同一个线程组的线程是可以相互修改对方的数据的。

    但如果在不同的线程组中,那么就不能“跨线程组”修改数据,可以从一定程度上保证数据安全.

    线程池:
    线程池存在的意义,首要作用是效率。
    线程的创建和结束都需要耗费一定的系统时间(特别是创建),不停创建和删除线程会浪费大量的时间。所以,在创建出一条线程并使其在执行完任务后不结束,而是使其进入休眠状态,在需要用时再唤醒,那么 就可以节省一定的时间。
    如果这样的线程比较多,那么就可以使用线程池来进行管理。保证效率。

    线程组和线程池共有的特点:

    1,都是管理一定数量的线程

    2,都可以对线程进行控制—包括休眠,唤醒,结束,创建,中断(暂停)–但并不一定包含全部这些操作。

    //线程组
    public class ThreadGroupDemo {
    	public static void main(String[] args) {
    		ThreadGroup tg = Thread.currentThread().getThreadGroup();
    		System.out.println(tg.getName());
    		
    		ThreadGroupRunnable tr = new ThreadGroupRunnable();
    		
    		ThreadGroup sgyy = new ThreadGroup("三国演义组");
    		// 第一组 三国演义组
    		Thread t1 = new Thread(sgyy, tr, "诸葛亮");
    		Thread t2 = new Thread(sgyy, tr, "司马懿");
    		Thread t3 = new Thread(sgyy, tr, "周瑜");
    		
    		ThreadGroup shz = new ThreadGroup("水浒传");
    		// 第一组 三国演义组
    		Thread t4 = new Thread(shz, tr, "李逵");
    		Thread t5 = new Thread(shz, tr, "宋江");
    		Thread t6 = new Thread(shz, tr, "卢俊义");
    		
    		/*t1.start();
    		t2.start();
    		t3.start();*/
    		List<Thread> threadList = new ArrayList<Thread>();
    		threadList.add(t1);
    		threadList.add(t2);
    		threadList.add(t3);
    		
    		// 批量设置为后台线程
    		sgyy.setDaemon(true);
    		
    		sgyy.stop();
    		
    		for (Thread thread : threadList) {
    			thread.start();
    		}
    		
    		System.out.println(t5.getThreadGroup().getName());
    	}
    }
    
    class ThreadGroupRunnable implements Runnable {
    
    	@Override
    	public void run() {
    		for (int i = 0; i < 100; i++) {
    			System.out.println(Thread.currentThread().getName() + ":" + i);
    		}
    	}
    	
    }
    

    Executors工厂类来产生线程池。

    //构造方法
    	public static ExecutorService newCachedThreadPool()
    	public static ExecutorService newFixedThreadPool(int nThreads)
    	public static ExecutorService newSingleThreadExecutor()
    

    示例:

    /*使用线程池 创建三个线程
    		1.打印 A-Z
    		2.计算m~n的和
    		3.拷贝文件*/
    public class ThreadPoolDemo {
    	public static void main(String[] args) throws InterruptedException, ExecutionException {
    		ExecutorService pool = Executors.newFixedThreadPool(3);
    		pool.submit(new MyRunnable());
    		Future<Integer> future = pool.submit(new MyCallable(1, 100));
    		pool.submit(new Runnable() {
    			
    			@Override
    			public void run() {
    				for (int i = 0; i < 100; i++) {
    					System.out.println(Thread.currentThread().getName() + ":" + i);
    				}
    			}
    		});
    		Integer i = future.get();
    		System.out.println("返回的结果: " + i);
    		
    		pool.shutdown();
    	}
    }
    
    class MyRunnable implements Runnable {
    
    	@Override
    	public void run() {
    		for (int i = 0; i < 100; i++) {
    			System.out.println(Thread.currentThread().getName() + ":" + i);
    		}
    	}
    	
    }
    
    class MyCallable implements Callable<Integer> {
    	
    	int m;
    	int n;
    
    	public MyCallable(int m, int n) {
    		super();
    		this.m = m;
    		this.n = n;
    	}
    
    	public MyCallable() {
    		super();
    	}
    
    	@Override
    	public Integer call() throws Exception {
    		int sum = 0;
    		for (int i = m; i <= n; i++) {
    			System.out.println(Thread.currentThread().getName() + ":" + i);
    			sum += i;
    		}
    		return sum;
    	}
    	
    }
    

    生产者与消费者模型

    对于此模型,应该明确一下几点:

    1、生产者仅仅在仓储未满时候生产,仓满则停止生产。

    2、消费者仅仅在仓储有产品时候才能消费,仓空则等待。

    3、当消费者发现仓储没产品可消费时候会通知生产者生产。

    4、生产者在生产出可消费产品时候,应该通知等待的消费者去消费。

    此模型将要结合java.lang.ObjectwaitnotifynotifyAll方法来实现以上的需求。这是非常重要的。

    下面通过代码来演示:

    商品

    //商品
    public class Toy {
    	private String toyName;
    	private int num;
    	// 表示是否有玩具, true表示有,false表示没有
    	private boolean flag;
    	public Toy() {
    		super();
    	}
    	public Toy(int num, String toyName, boolean flag) {
    		super();
    		this.num = num;
    		this.toyName = toyName;
    		this.flag = flag;
    	}
    	public String getToyName() {
    		return toyName;
    	}
    	public void setToyName(String toyName) {
    		this.toyName = toyName;
    	}
    	public int getNum() {
    		return num;
    	}
    	public void setNum(int num) {
    		this.num = num;
    	}
    	public boolean isFlag() {
    		return flag;
    	}
    	public void setFlag(boolean flag) {
    		this.flag = flag;
    	}
    	@Override
    	public String toString() {
    		return "Toy [toyName=" + toyName + ", num=" + num + ", flag=" + flag + "]";
    	}
    	
    	public synchronized void product(Toy t) {
    		// 先判断是否有玩具
    		if (this.isFlag()) {
    			try {
    				this.wait();
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    		}
    		
    		// 没有,就生产玩具
    		this.setToyName(t.getToyName());
    		this.setNum(t.getNum());
    		
    		// 修改标志位,表示有玩具
    		this.setFlag(t.isFlag());
    		// 生产完毕之后,就通知消费者消费
    		this.notify();
    	}
    	
    	public synchronized void consume() {
    		// 先判断是否有玩具
    		if (!this.isFlag()) {
    			// wait方法
    			try {
    				// 没有玩具,就等待!
    				this.wait();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    		
    		// 有玩具就消费
    		String toyName = this.getToyName();
    		int num = this.getNum();
    		System.out.println(toyName + "|" + num);
    		this.setNum(--num);
    		
    		// 消费完毕之后,就通知生产者生产
    		if (num <= 0) {
    			this.setFlag(false);
    			// 通知生产者生产
    			this.notify();
    		}
    	}
    }
    

    消费者

    // 消费者模型
    public class GetThread extends Thread {
    	
    	// 共享资源
    	private Toy t;
    	
    	public GetThread() {
    		super();
    	}
    	
    	public GetThread(Toy t) {
    		super();
    		this.t = t;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			t.consume();
    		}
    	}
    
    	public Toy getT() {
    		return t;
    	}
    
    	public void setT(Toy t) {
    		this.t = t;
    	}
    	
    }
    

    生产者

    public class SetThread extends Thread {
    	private Toy t;
    	// 控制如果是偶数,就生产铁胆火车侠,如果是奇数,就生产金刚狼
    	public int i;
    
    	public SetThread() {
    		super();
    	}
    
    	public SetThread(Toy t) {
    		super();
    		this.t = t;
    	}
    	
    	@Override
    	public void run() {
    		while (true) {
    			synchronized (t) {
    				if (i % 2 == 0) {
    					t.product(new Toy(10, "火影忍者", true));
    				} else {
    					t.product(new Toy(5, "变形金刚", true));
    				}
    				i++;
    				
    			}
    		}
    		
    	}
    
    	public Toy getT() {
    		return t;
    	}
    
    	public void setT(Toy t) {
    		this.t = t;
    	}
    	
    	
    }
    

    volatile关键字

    所谓原子性,就是某系列的操作步骤要么全部执行,要么都不执行。

    比如,变量的自增操作 i++,分三个步骤:

    1.从内存中读取出变量 i 的值

    2.将 i 的值加1

    3.将 加1 后的值写回内存
    假设i的值为10,A线程执行到第二步, i变成11, B线程抢到了执行权,B读取内存中的数据还是10,执行第二步i=11

    最后结果是11,预期结果是12,线程不安全

    这说明 i++ 并不是一个原子操作。因为,它分成了三步,

    有可能当某个线程执行到了第2步时被中断了,那么就意味着只执行了其中的两个步骤,没有全部执行。
    以下程序期望的结果应该是: 100*100=10000,但是,实际上count并没有达到10000

    volatile修饰的变量并不保证对它的操作(自增)具有原子性。

    AtomicInteger

    1、保证变量在线程间可见,对volatile变量所有的写操作都能立即反应到其他线程中,换句话说,volatile变量在各个线程中是一致的

    2、禁止指令的重排序优化; 指令重排序 , 这里间接可以保证线程安全

    1:int a = 1;

    2:int b = 2;

    3:boolean flag = true;

    4.if(flag)

    指令重排序后:

    –>

    3:boolean flag = true;

    2:int b = 2;

    1:int a = 1;

    如何保证线程安全? volatile + synchronized

    synchronized和volatile的区别

    1.volatile轻量级,只能修饰变量。synchronized重量级,还可修饰方法

    2.volatile只能保证数据的可见性,不能用来同步,因为多个线程并发访问volatile修饰的变量不会阻塞。

    3.synchronized不仅保证可见性,而且还保证原子性,因为,只有获得了锁的线程才能进入临界区

    从而保证临界区中的所有语句都全部执行。多个线程争抢synchronized锁对象时,会出现阻塞。

    用法示例

    /*现在有两个线程,一个是main线程,另一个是RunThread。
    	它们都试图修改 第三行的 isRunning变量。
    	按照JVM内存模型,main线程将isRunning读取到本地线程内存空间,修改后,再刷新回主内存。
    	线程会一直在私有堆栈中读取isRunning变量。
    	因此,RunThread线程无法读到main线程改变的isRunning变量
    	从而出现了死循环,导致RunThread无法终止。
    	解决方法,在第三行代码处用 volatile 关键字修饰即可。
    	这里,它强制线程从主内存中取 volatile修饰的变量。*/
    public class ThreadDemo {
        public static void main(String[] args) {
            try {
                RunThread thread = new RunThread();
                thread.start();
                Thread.sleep(1000);
                thread.setRunning(false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    class RunThread extends Thread {
    
        private volatile boolean isRunning = true;
    
        public boolean isRunning() {
            return isRunning;
        }
    
        public void setRunning(boolean isRunning) {
            this.isRunning = isRunning;
        }
    
        @Override
        public void run() {
            System.out.println("进入到run方法中了");
            while (isRunning == true) {
            }
            System.out.println("线程执行完成了");
        }
    }
    

    ThreadLocal

    线程安全问题: 多线程环境下,存在多条原子性语句操作共享数据,并且对数据做了写的操作,会出现线程安全问题,而ThreadLocal为变量在每个线程中都创建了一个副本,那样每个线程可以访问自己内部的副本变量,这样就解决了安全问题。

    代码示例:

    public class ThreadLocalDemo {
    	public static void main(String[] args) {
    		User user = new User("隔壁老王", "123456");
    		// MyThreadLocal<User> tl = new MyThreadLocal<User>();
    		ThreadLocal<User> tl = new ThreadLocal<>();
    		tl.set(user);
    		
    		new Thread(new Runnable() {
    			
    			@Override
    			public void run() {
    				tl.set(new User("隔壁老李", "789456"));
    				System.out.println(tl.get());
    			}
    		}).start();
    		
    		try {
    			Thread.sleep(1000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		
    		System.out.println(tl.get());
    	}
    }
    
    // 容器,这个容器用来绑定 线程 和 数据 Map
    class MyThreadLocal<T> {
    	
    	private HashMap<Thread, T> map;
    	
    	public MyThreadLocal() {
    		map = new HashMap<>();
    	}
    
    	public void put(T t) {
    		//		main = new User("隔壁老王", "123456")
    		map.put(Thread.currentThread(), t);
    	}
    	
    	public T get() {
    		return map.get(Thread.currentThread());
    	}
    }
    
    class User {
    	private String userName;
    	private String password;
    	public User() {
    		super();
    	}
    	public User(String userName, String password) {
    		super();
    		this.userName = userName;
    		this.password = password;
    	}
    	public String getUserName() {
    		return userName;
    	}
    	public void setUserName(String userName) {
    		this.userName = userName;
    	}
    	public String getPassword() {
    		return password;
    	}
    	public void setPassword(String password) {
    		this.password = password;
    	}
    	@Override
    	public String toString() {
    		return "User [userName=" + userName + ", password=" + password + "]";
    	}
    	
    }
    
    

    定时器

    Timer定时器对象和TimerTask 定时器任务

    代码示例:

    public class TimerTest {
    	public static void main(String[] args) {
    		Timer t = new Timer();
    		TimerTask task = new MyTask(t);
    		Calendar c = Calendar.getInstance();
    		c.set(2019, 7, 24, 15, 24, 30);
    		long time = c.getTimeInMillis();
    		Date d = new Date(time);
    		// 规定制定的时间启动线程
    		// t.schedule(task, d);
    		
    		// t.schedule(task, 3000L);
    		
    		t.schedule(task, 3000, 1000);
    	}
    }
    
    class MyTask extends TimerTask {
    	
    	private Timer t;
    	
    	public MyTask(Timer t) {
    		super();
    		this.t = t;
    	}
    
    	public MyTask() {
    		super();
    	}
    
    	@Override
    	public void run() {
    		System.out.println("Boom!!!");
    		// t.cancel();
    	}
    	
    }
    
    展开全文
  • Kafka的producer和consumer都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上是用多个线程并发地向不同分区所...
    By 大数据技术与架构场景描述:Kafka使用分区将topic的消息打散到多个分区分布保存在不同的broker上,实现了producerconsumer消息处理的高吞吐量。Kafkaproducerconsumer都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费。

      所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。

    分区多的优点

    kafka使用分区将topic的消息打散到多个分区分布保存在不同的broker上,实现了producer和consumer消息处理的高吞吐量。Kafka的producer和consumer都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费。

    所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。

    分区不是越多越好

    分区是否越多越好呢?显然也不是,因为每个分区都有自己的开销:

    一、客户端/服务器端需要使用的内存就越多

    Kafka0.8.2之后,在客户端producer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。

    服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本就越大。

    二、文件句柄的开销

    每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件:base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。

    三、降低高可用性

    Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本(replica_factor指定副本数)。每个副本保存在不同的broker上。其中的一个副本充当leader 副本,负责处理producer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。

    如何确定分区数量呢?  

    可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 =  Tt / max(Tp, Tc)

    说明:Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。

    一条消息如何知道要被发送到哪个分区?

    按照key值分配

    默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions:

    def partition(key: Any, numPartitions: Int): Int = {    Utils.abs(key.hashCode) % numPartitions }

    这保证了相同key的消息一定会被路由到相同的分区。key为null时,从缓存中取分区id或者随机取一个。如果你没有指定key,那么Kafka是如何确定这条消息去往哪个分区的呢?

    8703276090c661006654bdc143afdcab.png

    不指定key时,Kafka几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时)。

    Consumer个数与分区数有什么关系?

    topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。

    7d4c1030adeee9013b2f8a0028d52550.png

    所以,如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。

    Consumer消费Partition的分配策略

    Kafka提供的两种分配策略:range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。

    当以下事件发生时,Kafka 将会进行一次分区分配:

    • 同一个 Consumer Group 内新增消费者

    • 消费者离开当前所属的Consumer Group,包括shuts down 或 crashes

    • 订阅的主题新增分区

    将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance),如何rebalance就涉及到本文提到的分区分配策略。

    下面我们将详细介绍 Kafka 内置的两种分区分配策略。本文假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)

    来消费这10个分区里面的数据,而且 C1 的 num.streams = 1,C2 的 num.streams = 2。

    Range strategy

    Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。在我们的例子里面,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C2-1。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区,所以最后分区分配的结果看起来是这样的:

    • C1-0 将消费 0, 1, 2, 3 分区

    • C2-0 将消费 4, 5, 6 分区

    • C2-1 将消费 7, 8, 9 分区

    假如我们有11个分区,那么最后分区分配的结果看起来是这样的:

    • C1-0 将消费 0, 1, 2, 3 分区

    • C2-0 将消费 4, 5, 6, 7 分区

    • C2-1 将消费 8, 9, 10 分区

    假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:

    • C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区

    • C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区

    • C2-1 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区

    可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这就是Range strategy的一个很明显的弊端。

    RoundRobin strategy

    使用RoundRobin策略有两个前提条件必须满足:

    同一个Consumer Group里面的所有消费者的num.streams必须相等;

    每个消费者订阅的主题必须相同。

    所以这里假设前面提到的2个消费者的num.streams = 2。RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,看下面的代码应该会明白:

    36e3b7465d55a06a161d0b9f8d924281.png

    最后按照round-robin风格将分区分别分配给不同的消费者线程。

    在这个的例子里面,假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:

    • C1-0 将消费 T1-5, T1-2, T1-6 分区;

    • C1-1 将消费 T1-3, T1-1, T1-9 分区;

    • C2-0 将消费 T1-0, T1-4 分区;

    • C2-1 将消费 T1-8, T1-7 分区;

    多个主题的分区分配和单个主题类似。遗憾的是,目前我们还不能自定义分区分配策略,只能通过partition.assignment.strategy参数选择 range 或 roundrobin。

    来源:http://suo.im/5yXBUO作者:AlferWei

    246016485bd1579f04a31fea9dd02729.png

    展开全文
  • 多线程

    2019-08-14 19:45:13
    1、多线程2、多线程执行原理3、是不是线程越多越好?三、线程和进程的区别?四、创建线程有哪几种方式?1、 继承Thread类创建线程类2、通过Runnable接口创建线程类3、通过Callable和Future创建线程五、run方法是干...
  • JDK1.8的ConcurrentHashmap 利用 ==CAS + synchronized== 来保证并发更新的安全 底层使用==数组+链表+红黑树==来实现 -----------------------------------------------------------------...因为多线程环境下,使...
  • 多线程的理解

    2020-10-29 18:16:08
    文章目录1 多线程2线程是不是越多越好?2.1 例子说明2.2 原因:3 线程池往下看 1 多线程 2线程是不是越多越好? 2.1 例子说明 启动: 查看运行情况: 2.2 原因: 3 线程池 往下看 ......
  • 1. Kafka的分区数是不是越多越好? 1.1 分区多的优点 Kafka使用分区将topic的消息打算到多个分区分布保存在不同的broker上,实现了producer和consumer消息处理的高吞吐量。 Kafka的producer和consumer都可以多线程...
  • java多线程

    2018-02-05 16:49:00
    传入的参数代表我们配置的线程数,是不是越多越好呢?肯定不是。因为我们在配置线程数的时候要充分考虑服务器的性能,线程配置的多,服务器的性能未必就优。通常,机器完成的计算是由线程数决定的,当线程数到达峰值...
  • 传入的参数代表我们配置的线程数,是不是越多越好呢?肯定不是。因为我们在配置线程数的时候要充分考虑服务器的性能,线程配置的多,服务器的性能未必就优。通常,机器完成的计算是由线程数决定的,当线程数到达峰值...
  • C++多线程多少个线程算多?

    千次阅读 2018-10-25 16:54:08
    那么,是不是线程越多越好呢? 假设我们有100个下载任务,我们可以有以下3种实现方法: 使用一个线程,依次执行100个下载任务; 使用100个线程,每个线程执行一个下载任务; 使用10个线程,每个线程依次执行10个...
  • C++多线程编程

    2020-09-01 10:41:04
    3、我们知道多线程可以提高程序运行效率,那是不是线程数越多越好? 4、对于3的回答一定是否定的,那么线程数到底设置成多少效率最高? 5、既然我们知道创建多少个线程数是合适的,那么为什么还要搞一个线程池呢? 6...
  • Java多线程

    2015-08-07 13:51:52
    介绍 在使用Java作为开发语言时,如果需要高效率、并行地处理请求,通常采用的方法就是“多线程”。...所以说,是不是要采用多线程,以及要同时开启多少个线程都是要根据实际需要来定的,不是越多越好
  • 浅淡Java多线程

    2017-03-21 17:03:00
    工作中一直忙着实现业务逻辑,多线程接触得不多。对多线程的认知,一直停留在...举个例子,男人(CPU),老婆(线程),那问题可以转换成:是不是老婆越多越好?答案相信大家心中有数。什么?你觉得老婆越多越好...
  • 多线程之线程池多线程概念进程和线程单线程和多线程多线程的产生条件多线程的意义多线程引发的思考---线程是不是越多越好?线程池概念线程池的组成线程池常用API说明线程池的运行原理线程池的使用线程池的关闭---...
  • 多线程多少算多?

    2017-11-17 09:39:43
    最近在抓软件的性能优化。...那么,是不是线程越多越好呢? 假设我们有100个下载任务,我们可以有以下3种实现方法: 使用一个线程,依次执行100个下载任务;使用100个线程,每个线程执行一个下
  • java多线程的深入理解以及原理解读

    千次阅读 2020-10-12 22:00:26
    一....因此多线程也不是越多越好。 原因二大部分项目当中如果使用单线程,那么从一个请求进来到响应,只有协议解析和响应后数据处理占用了CPU(先不考虑计算型服务),那么请求发送到服务后,CPU一直..
  • 当有一个线程的时候,是不是越多越好。 当然不是,当执行线程的时候,线程将会不断的来回切换,大量的时间全部浪费在线程的切换上面。并不是越多越好。 一个java线程对应这cpu内核的一个线程 1.锁的...
  •  多线程开发不仅提升了了程序执行的效率,更是大大解决了单线程中所无法解决的同步问题,那么多线程开发...1. 线程创建是否越多越好?怎么样提高效率? 举一个多线程搜索的例子,写一个搜索文件后缀的方法: ...
  • 文章目录为什么要用线程池线程是不是越多越好?线程池原理 - 概念线程池API中接口的定义和实现类 为什么要用线程池 在学习线程池之前首先要明白为什么要用线程池,难倒创建线程他不香吗? 线程是不是越多越好? 答案肯定...
  • 最近在做一个爬虫相关的项目,单线程的整站爬虫,耗时真的...一、 既然多线程可以缩短程序运行时间,那么,是不是线程数量越多越好呢?显然,并不是,每一个线程的从生成到消亡也是需要时间和资源的,太多的线程会占...
  • 多线程开发不仅提升了了程序执行的效率,更是大大解决了单线程中所无法解决的同步问题,那么多线程...1. 线程创建是否越多越好?怎么样提高效率? 举一个多线程搜索的例子,写一个搜索文件后缀的方法: publicvo
  • 线程是不是越多越好 不是 如无必要,尽量嫌少使用线程 如果线程 创建时间+销毁时间 > 执行时间 着很不划算 线程 [默认最大栈1M] 会占用内存 操作系统 频繁的切换上下文 ,非常影响性能 为了 既能保障 同时...
  • 一、 既然多线程可以缩短程序运行时间,那么,是不是线程数量越多越好呢?显然,并不是,每一个线程的从生成到消亡也是需要时间和资源的,太多的线程会占用过多的系统资源(内存开销,cpu开销),而且生成太多的线程...
  • 一、需求缘起 Web-Server通常有个配置,最大工作线程数,后端服务一般也有个配置,工作线程池的线程数量,这个线程数的配置不同的业务...提问:工作线程是不是设置的大越? 回答:肯定不是的 1)一来服务器CPU核
  • 锁细化:同步代码块中语句越少越好,只在必要同步的代码上加锁 锁粗化:代码中加了个细锁,不如在大块中加锁,减少代码争用 3:但是效率问题还是可以使用更好的方法解决。比如使用volatile和CAS,也叫无锁,至于...

空空如也

空空如也

1 2 3 4 5 6
收藏数 118
精华内容 47
关键字:

多线程是不是越多越好