2018-11-19 20:47:01 m0_37542524 阅读数 368
  • C++多线程编程视频教程(C++11多线程并发

    线程与进程相比,它是一种花销小,切换快,更节俭的多任务的操作方式。多编程并发在企业中开发显得尤为重要,本课程包含Windows多线程编程与C++11高并发编程,通过浅显易懂的代码与讲解,让你的多线程编程能力得到质的飞跃,具备开发高并发代码的能力!

    3109 人正在学习 去看看 黄强


在这里插入图片描述

TCP循环服务器模型 TCP多进程并发服务器 TCP多线程服务器
socket(...);
bind(...);
listen(...);
while(1){
  accept(...);
  process(...);
  close(...);
}
socket(...);
bind(...);
listen(...);
while(1){
  accpet(...);
  if(fork(...) == 0){
    process(...);
    close(...);
    exit(...);
  }
  close(...);
}
socket(...);
bind(...);
listen(...);
while(1){
  accpet(...);
  if((pthread_create(...))!==-1) {
    process(...);
    close(...);
    exit(...);
  }
  close(...)
}
TCP服务器一般很少用 TCP并发服务器的思想是每一个客户机的请求并不由服务器直接处理,而是由服务器创建一个子进程来处理。 多线程服务器是对多进程的服务器的改进

1,TCP多线程并发服务器

1.1,头文件net.h

#ifndef __NET_H__
#define __NET_H__

#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>

#define SERV_IP_ADDR "192.168.31.100"
#define SERV_PORT 5005
#define BACKLOG 5
#define QUIT_STR "quite"

#endif

1.2,客户端client.c

>## /* ./client serv_ip serv_port */
#include "net.h"

void usage(char *s)
{
	printf("Usage: %s <serv_ip> <serv_port>\n",s);
	printf("\tserv_ip: server ip address\n");
	printf("\tserv_port: server port(>5000)\n ");
}
int main(int argc, const char *argv[])
{
	int fd;
	short port;
	struct sockaddr_in sin;
	if(argc != 3)
	{
		usage((char *)argv[0]);
		exit(1);
	}
	if((port = atoi(argv[2])) < 5000)
	{
		usage((char *)argv[0]);
		exit(1);
	}
	/* 1 创建socket fd */
	if((fd = socket(AF_INET,SOCK_STREAM,0)) < 0)
	{
		perror("socket");
		exit(-1);
	}
	
	/* 2 连接服务器 */
	/* 2.1 填充struct sockaddr_in结构体变量*/
	bzero(&sin,sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(port);//转为网络字节序端口号
	if(inet_pton(AF_INET,argv[1],(void *)&sin.sin_addr.s_addr) < 0)
	{
		perror("inet_pton");
		goto _error1;
	}

	/* 2.2 连接服务器*/
	if(connect(fd,(struct sockaddr *)&sin,sizeof(sin)) < 0)
	{
		perror("connect");
		goto _error1;
	}
	/* 3 读写*/
	char buf[BUFSIZ];
	while(1)
	{
		bzero(buf,BUFSIZ);
		if(fgets(buf,BUFSIZ-1,stdin) == NULL)
		{
			continue;
		}
		write(fd,buf,strlen(buf));
		if(strncasecmp(buf,QUIT_STR,strlen(QUIT_STR)) == 0)
		{
			printf("client is existing!\n");
			break;
		}
	}

_error1:
	close(fd);
	return 0;
}

1.3,服务器端server.c

#include "net.h"
#include <pthread.h>

/* 线程传参  */
typedef struct{
	int addr;//客户端IP地址
	int port;//客户端端口号
	int fd;//为请求链接的客户端分配的新的socket fd
}ARG;

/* 子线程处理函数 */
void client_data_handle(void *arg);

int main(int argc, const char *argv[])
{
	pthread_t tid;//子线程ID号
	int fd;
	struct sockaddr_in sin;//如果是IPV6的编程,要使用struct sockddr_in6结构体(详细情况请参考man 7 ipv6),通常更通用的方法可以通过struct sockaddr_storage来编程

	/* 1 创建socket fd */
	if((fd = socket(AF_INET,SOCK_STREAM,0)) < 0)
	{
		perror("socket");
		exit(-1);
	}
	/* 优化 1 允许绑定地址快速重用 */ 
	int b_reuse = 1;
	setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&b_reuse,sizeof(int)); 
	
	/* 2 绑定 */
	/* 2.1 填充struct sockaddr_in 结构体变量*/
	bzero(&sin,sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(SERV_PORT);
#if 1
	/* 优化 2 让服务器可以绑定在任意的IP上*/
	sin.sin_addr.s_addr = htonl(INADDR_ANY);
#else
	if(inet_pton(AF_INET,SERV_IP_ADDR,(void *)&sin.sin_addr.s_addr) < 0)
	{
		perror("inet_pton");
		goto _error1;
	}
#endif
	/* 2.2 绑定*/
	if(bind(fd,(struct sockaddr *)&sin,sizeof(sin)))
	{
		perror("bind");
		goto _error1;
	}

	/* 3 使用listen()把主动套接字变成被动套接字 */
	if(listen(fd,BACKLOG) < 0)
	{
		perror("listen");
		goto _error1;
	}
	
	int newfd = -1;
	struct sockaddr_in cin;
	socklen_t cin_addr_len = sizeof(cin);
	/* 优化 4 用多进程/多线程处理已经建立好链接的客户端数据*/
	while(1)
	{
		/* 4 阻塞等待客户端链接请求*/
		/* 优化 3 通过函数获取刚建立链接的客户端的IP地址和端口号*/
		if((newfd = accept(fd,(struct sockaddr *)&cin,&cin_addr_len)) < 0)
		{
			perror("connect");
			goto _error1;
		}
		
		ARG arg;
		arg.addr = cin.sin_addr.s_addr;
		arg.port = ntohs(cin.sin_port);
		arg.fd = newfd;
		
		if(pthread_create(&tid,NULL,(void *)client_data_handle,(void *)&arg) != 0)
		{
			perror("pthread_create");
			goto _errno2;
		}
	}

_errno2:
	close(newfd);
_error1:
	close(fd);
	return 0;
}

void client_data_handle(void *arg)
{
	int ret = -1;//read()是个阻塞函数,要做读写错误的工程处理
	char buf[BUFSIZ];//BUFSIZ是系统提供的
	char cin_ipv4_addr[16];
	ARG parg = *(ARG *)arg;
		
	if(inet_ntop(AF_INET,&parg.addr,cin_ipv4_addr,sizeof(cin_ipv4_addr)) < 0)
	{
		perror("inet_ntop");
		exit(-1);
	}

	printf("client(:%s potr(:%d\n",cin_ipv4_addr,parg.port);

	printf("the client pthread fd is %d\n",parg.fd);
	while(1)
	{
		/* 5 读写*/
		bzero(buf,BUFSIZ);
		do
		{
			ret = read(parg.fd,buf,BUFSIZ-1);
		}while(ret < 0 && errno == EINTR);//阻塞读写
		if(ret < 0)
		{
			perror("read");
			break;
		}
		if(ret == 0)//对方已关闭
		{
			break;
		}

		printf("receive data: %s",buf);
	
		
		if(strncasecmp(buf,QUIT_STR,strlen(QUIT_STR)) == 0)
		{
			printf("client is existing!\n");
			break;
		}
	}
	close(parg.fd);
	printf("pthread fd %d is closed!\n",parg.fd);
}

2,TCP多进程并发服务器

2.1,头文件net.h

#ifndef __NET_H__
#define __NET_H__

#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>

#define SERV_IP_ADDR "192.168.31.100"
#define SERV_PORT 5005
#define BACKLOG 5
#define QUIT_STR "quite"

#endif

2.2,客户端程序client.c

/* ./client serv_ip serv_port */
#include "net.h"

void usage(char *s)
{
	printf("Usage: %s <serv_ip> <serv_port>\n",s);
	printf("\tserv_ip: server ip address\n");
	printf("\tserv_port: server port(>5000)\n ");
}
int main(int argc, const char *argv[])
{
	int fd;
	short port;
	struct sockaddr_in sin;
	if(argc != 3)
	{
		usage((char *)argv[0]);
		exit(1);
	}
	if((port = atoi(argv[2])) < 5000)
	{
		usage((char *)argv[0]);
		exit(1);
	}
	/* 1 创建socket fd */
	if((fd = socket(AF_INET,SOCK_STREAM,0)) < 0)
	{
		perror("socket");
		exit(-1);
	}
	
	/* 2 连接服务器 */
	/* 2.1 填充struct sockaddr_in结构体变量*/
	bzero(&sin,sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(port);//转为网络字节序端口号
	if(inet_pton(AF_INET,argv[1],(void *)&sin.sin_addr.s_addr) < 0)
	{
		perror("inet_pton");
		goto _error1;
	}

	/* 2.2 连接服务器*/
	if(connect(fd,(struct sockaddr *)&sin,sizeof(sin)) < 0)
	{
		perror("connect");
		goto _error1;
	}
	/* 3 读写*/
	char buf[BUFSIZ];
	while(1)
	{
		bzero(buf,BUFSIZ);
		if(fgets(buf,BUFSIZ-1,stdin) == NULL)
		{
			continue;
		}
		write(fd,buf,strlen(buf));
		if(strncasecmp(buf,QUIT_STR,strlen(QUIT_STR)) == 0)
		{
			printf("client is existing!\n");
			break;
		}
	}

_error1:
	close(fd);
	return 0;
}

2.3,服务器端程序service.c

#include "net.h"
#include <pthread.h>
#include <signal.h>

/* 线程传参  */
typedef struct{
	int addr;//客户端IP地址
	int port;//客户端端口号
	int fd;//为请求链接的客户端分配的新的socket fd
}ARG;

/* 子线程处理函数 */
void client_data_handle(ARG *arg);
void sig_child_handle(int signo)
{
	if(SIGCHLD == signo)
	{
		waitpid(-1,NULL,WNOHANG);
	}
}

int main(int argc, const char *argv[])
{
	int fd;
	struct sockaddr_in sin;//如果是IPV6的编程,要使用struct sockddr_in6结构体(详细情况请参考man 7 ipv6),通常更通用的方法可以通过struct sockaddr_storage来编程
	pid_t pid;//子进程ID

	signal(SIGCHLD,sig_child_handle);

	/* 1 创建socket fd */
	if((fd = socket(AF_INET,SOCK_STREAM,0)) < 0)
	{
		perror("socket");
		exit(-1);
	}
	/* 优化 1 允许绑定地址快速重用 */ 
	int b_reuse = 1;
	setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&b_reuse,sizeof(int)); 
	
	/* 2 绑定 */
	/* 2.1 填充struct sockaddr_in 结构体变量*/
	bzero(&sin,sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(SERV_PORT);
#if 1
	/* 优化 2 让服务器可以绑定在任意的IP上*/
	sin.sin_addr.s_addr = htonl(INADDR_ANY);
#else
	if(inet_pton(AF_INET,SERV_IP_ADDR,(void *)&sin.sin_addr.s_addr) < 0)
	{
		perror("inet_pton");
		goto _error1;
	}
#endif
	/* 2.2 绑定*/
	if(bind(fd,(struct sockaddr *)&sin,sizeof(sin)))
	{
		perror("bind");
		goto _error1;
	}

	/* 3 使用listen()把主动套接字变成被动套接字 */
	if(listen(fd,BACKLOG) < 0)
	{
		perror("listen");
		goto _error1;
	}
	
	struct sockaddr_in cin;
	socklen_t cin_addr_len = sizeof(cin);
	/* 优化 4 用多进程/多线程处理已经建立好链接的客户端数据*/
	while(1)
	{
		int newfd = -1;
		/* 4 阻塞等待客户端链接请求*/
		/* 优化 3 通过函数获取刚建立链接的客户端的IP地址和端口号*/
		if((newfd = accept(fd,(struct sockaddr *)&cin,&cin_addr_len)) < 0)
		{
			perror("connect");
			goto _error1;
		}
		if((pid = fork()) < 0)
		{
			perror("fork");
			break;
		}
		else if(pid == 0)
		{
			ARG arg;
			arg.addr = cin.sin_addr.s_addr;
			arg.port = ntohs(cin.sin_port);
			arg.fd = newfd;

			close (fd);
			client_data_handle(&arg);
			return 0;
		}
		else
		{
			close(newfd);
		}
		close(newfd);

	}

_error1:
	close(fd);
	return 0;
}

void client_data_handle(ARG *arg)
{
	int ret = -1;//read()是个阻塞函数,要做读写错误的工程处理
	char buf[BUFSIZ];//BUFSIZ是系统提供的
	char cin_ipv4_addr[16];
	ARG parg = *arg;
		
	if(inet_ntop(AF_INET,&parg.addr,cin_ipv4_addr,sizeof(cin_ipv4_addr)) < 0)
	{
		perror("inet_ntop");
		exit(-1);
	}

	printf("client(:%s potr(:%d\n",cin_ipv4_addr,parg.port);

	printf("the client process fd is %d\n",parg.fd);
	while(1)
	{
		/* 5 读写*/
		bzero(buf,BUFSIZ);
		do
		{
			ret = read(parg.fd,buf,BUFSIZ-1);
		}while(ret < 0 && errno == EINTR);//阻塞读写
		if(ret < 0)
		{
			perror("read");
			break;
		}
		if(ret == 0)//对方已关闭
		{
			break;
		}

		printf("receive data: %s",buf);
		
		if(strncasecmp(buf,QUIT_STR,strlen(QUIT_STR)) == 0)
		{
			printf("client is existing!\n");
			break;
		}
	}
	close(parg.fd);
	printf("process fd %d is closed!\n",parg.fd);

	return ;
}
2016-12-21 11:21:22 qq_29227939 阅读数 6332
  • C++多线程编程视频教程(C++11多线程并发

    线程与进程相比,它是一种花销小,切换快,更节俭的多任务的操作方式。多编程并发在企业中开发显得尤为重要,本课程包含Windows多线程编程与C++11高并发编程,通过浅显易懂的代码与讲解,让你的多线程编程能力得到质的飞跃,具备开发高并发代码的能力!

    3109 人正在学习 去看看 黄强

  上一篇文章使用fork函数实现了多进程并发服务器,但是也提到了一些问题:

  1. fork是昂贵的。fork时需要复制父进程的所有资源,包括内存映象、描述字等;
  2. 目前的实现使用了一种写时拷贝(copy-on-write)技术,可有效避免昂贵的复制问题,但fork仍然是昂贵的;
  3. fork子进程后,父子进程间、兄弟进程间的通信需要进程间通信IPC机制,给通信带来了困难;
  4. 多进程在一定程度上仍然不能有效地利用系统资源;
  5. 系统中进程个数也有限制。

  下面就介绍实现并发服务器的另外一种方式,使用多线程实现。多线程有助于解决以上问题。

线程基础

  关于线程的概念就不介绍了,先了解一下linux下线程的一些基本操作。

线程基础函数

  • pthread_create 创建线程

  pthread_create 函数用于创建新线程。当一个程序开始运行时,系统产生一个称为初始线 程或主线程的单个线程。额外的线程需要由 pthread_create 函数创建。 pthread_create 函数原型如下:

#include <pthread.h> 
int pthread_create(pthread_t *tid, const pthread_attr_t *attr, void *(*func)(void *), void *arg); 

  如果新线程创建成功,参数 tid 返回新生成的线程 ID。一个进程中的每个线程都由一个 线程 ID 标识,其类型为 pthread_t。attr 指向线程属性的指针。每个线程有很多属性包括:优 先级、起始栈大小、是否是守护线程等等。通常将 attr 参数的值设为 NULL,这时使用系统 默认的属性。
  但创建完一个新的线程后,需要说明它将执行的函数。函数的地址由参数 func 指定。该函数必须是一个静态函数,它只有一个通用指针作为参数,并返回一个通用指针。该执行函 数的调用参数是由 arg 指定,arg 是一个通用指针,用于往 func 函数中传递参数。如果需要传递多个参数时,必须将它们打包成一个结构,然后让 arg 指向该结构。线程以调用该执行 函数开始。
  如果函数调用成功返回 0,出错则返回非 0。

  常见的返回错误值:

EAGAIN:超过了系统线程数目的限制。
ENOMEN:没有足够的内存产生新的线程。
EINVAL:无效的属性attr值。

  示例代码:

#include <pthread.h>
#include <stdio.h>
pthread_t  tid;
void *ex()
{
    printf("this is a thread");
}
void main()
{
    pthread_create(&tid,NULL,ex,NULL);
}

  给线程传递参数:

void *function(void *arg); 
struct ARG { 
     int connfd; 
     int other;   //other data 
 }; 
 void main()  
 { 
     struct ARG arg; 
     int connfd,sockfd; 
     pthread_t tid; 
     //...
     While(1) 
     {     
         if((connfd = accept(sockfd,NULL,NULL))== -1) 
         { 
              //handle exception                    
          } 
          arg.connfd = connfd; 
          if(pthread_create(&tid, NULL, funtion, (void *)&arg)) 
          { 
              // handle exception 
          } 
      } 
 } 
 void *funtion(void *arg) 
 { 
    struct  ARG info; 
    info.connfd = ((struct ARG *)arg) -> connfd; 
    info.other = ((struct ARG *)arg) -> other; 
    //… 
    close(info.connfd); 
    pthread_exit(NULL); 
 }
  • pthread_join
      看这个函数首先提出一个概念,线程的类型。线程分为两类:可联合的和分离的。

    1. 默认情况下线程都是可联合的。可联合的线程终止 时,其线程 ID 和终止状态将保留,直到线程调用 pthread_join 函数。
    2. 而分离的线程退出后, 系统将释放其所有资源,其他线程不能等待其终止。如果一个线程需要知道另一个线程什么 时候终止,最好保留第二个线程的可联合性。

  pthread_join 函数与进程的 waitpid 函数功能类似,等待一个线程终止。
pthread_join 函数原型如下:

#inlcude <pthread.h>
int pthread_join(pthread_t tid, void **status); 

  参数 tid 指定所等待的线程 ID。该函数必须指定要等待的线程,不能等待任一个线程结束。要求等待的线程必须是当前进程的成员,并且不是分离的线程或守护线程。
  几个线程不 能同时等待一个线程完成,如果其中一个成功调用 pthread_join 函数,则其他线程将返回 ESRCH 错误。
  如果等待的线程已经终止,则该函数立即返回。如果参数 status 指针非空,则 指向终止线程的退出状态值。
  该函数如果调用成功则返回 0,出错时返回正的错误码。

  • pthread_detach
      pthread_detach 函数将指定的线程变成分离的。 pthread_detach 函数原型如下:
#inlcude <pthread.h> 
int pthread_detach(pthread_t tid) ;

  参数 tid 指定要设置为分离的线程 ID。

  • pthread_self
      每一个线程都有一个 ID,pthread_self 函数返回自己的线程 ID。 pthread_self 函数原型如下:
#inlcude <pthread.h> 
pthread_t pthread_self(void); 

  参数 tid 指定要设置为分离的线程 ID。 函数返回调用函数的线程 ID。
  例如,线程可以通过如下语句,将自己设为可分离的:

pthread_detach(pthread_self()); 
  • pthread_exit
      函数 pthread_exit 用于终止当前线程,并返回状态值,如果当前线程是可联合的,则其 退出状态将保留。 pthread_exit函数原型如下:
#include <pthread.h> 
void pthread_exit(void *status);

  参数 status 指向函数的退出状态。这里的 status 不能指向一个局部变量,因为当前线程 终止后,其所有局部变量将被撤销。
  该函数没有返回值。
  还有两种方法可以使线程终止:

  1. 启动线程的函数 pthread_create 的第三个参数返回。该返回值就是线程的终止状态。
  2. 如果进程的 main 函数返回或者任何线程调用了 exit 函数,进程将终止,线程将随之 终止。

  下面可以看一下多线程并发服务器的实例了,需要注意的是,线程建立后,父、子线程不需要关闭任何的描述符,因为线程中使用的描述符是共享进程中的数据。

#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <unistd.h>  
#include <sys/types.h>  
#include <sys/socket.h>  
#include <netinet/in.h>  
#include <arpa/inet.h>  
#include <pthread.h>  

#define PORT 1234  
#define BACKLOG 5  
#define MAXDATASIZE 1000  

void process_cli(int connfd, struct sockaddr_in client);  
void *function(void* arg);  
struct ARG {  
    int connfd;  
    struct sockaddr_in client;  
};  

void main()  
{  
    int listenfd,connfd;  
    pthread_t  tid;  
    struct ARG *arg;  
    struct sockaddr_in server;  
    struct sockaddr_in client;  
    socklen_t  len;  

    if ((listenfd =socket(AF_INET, SOCK_STREAM, 0)) == -1) {  
        perror("Creatingsocket failed.");  
        exit(1);  
    }  

    int opt =SO_REUSEADDR;  
    setsockopt(listenfd,SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));  

    bzero(&server,sizeof(server));  
    server.sin_family=AF_INET;  
    server.sin_port=htons(PORT);  
    server.sin_addr.s_addr= htonl (INADDR_ANY);  
    if (bind(listenfd,(struct sockaddr *)&server, sizeof(server)) == -1) {  
        perror("Bind()error.");  
        exit(1);  
    }  

    if(listen(listenfd,BACKLOG)== -1){  
        perror("listen()error\n");  
        exit(1);  
    }  

    len=sizeof(client);  
    while(1)  
    {  
        if ((connfd =accept(listenfd,(struct sockaddr *)&client,&len))==-1) {  
            perror("accept() error\n");  
            exit(1);  
        }  
        arg = (struct ARG *)malloc(sizeof(struct ARG));  
        arg->connfd =connfd;  
        memcpy((void*)&arg->client, &client, sizeof(client));  

        if(pthread_create(&tid, NULL, function, (void*)arg)) {  
            perror("Pthread_create() error");  
            exit(1);  
        }  
    }  
    close(listenfd);  
}  

void process_cli(int connfd, struct sockaddr_in client)  
{  
    int num;  
    char recvbuf[MAXDATASIZE], sendbuf[MAXDATASIZE], cli_name[MAXDATASIZE];  

    printf("Yougot a connection from %s. \n ",inet_ntoa(client.sin_addr) );  
    num = recv(connfd,cli_name, MAXDATASIZE,0);  
    if (num == 0) {  
        close(connfd);  
        printf("Clientdisconnected.\n");  
        return;  
    }  
    cli_name[num - 1] ='\0';  
    printf("Client'sname is %s.\n",cli_name);  

    while (num =recv(connfd, recvbuf, MAXDATASIZE,0)) {  
        recvbuf[num] ='\0';  
        printf("Receivedclient( %s ) message: %s",cli_name, recvbuf);  
        int i;  
        for (i = 0; i <num - 1; i++) {  
            if((recvbuf[i]>='a'&&recvbuf[i]<='z')||(recvbuf[i]>='A'&&recvbuf[i]<='Z'))  
            {  
                recvbuf[i]=recvbuf[i]+ 3;  
                if((recvbuf[i]>'Z'&&recvbuf[i]<='Z'+3)||(recvbuf[i]>'z'))  
                recvbuf[i]=recvbuf[i]- 26;  
            }  
            sendbuf[i] =recvbuf[i];  
        }  
        sendbuf[num -1] = '\0';  
        send(connfd,sendbuf,strlen(sendbuf),0);  
    }  
    close(connfd);  
}  

void *function(void* arg)  
{  
    struct ARG *info;  
    info = (struct ARG*)arg;  
    process_cli(info->connfd,info->client);  
    free (arg);  
    pthread_exit(NULL);  
}  

线程安全性

  上面的示例代码服务器端的业务逻辑都比较简单,没有涉及到共享数据产生的同步问题。在某些情况下,我们需要多个线程共享全局数据,在访问这些数据时就需要用到同步锁机制。而在共享线程内的全局数据时,可以使用Linux提供的线程特定数据TSD解决。

同步机制

  在linux系统中,提供一种基本的进程同步机制—互斥锁,可以用来保护线程代码中共享数据的完整性。
  操作系统将保证同时只有一个线程能成功完成对一个互斥锁的加锁操作。
  如果一个线程已经对某一互斥锁进行了加锁,其他线程只有等待该线程完成对这一互斥锁解锁后,才能完成加锁操作。

互斥锁函数

pthread_mutex_lock(pthread_mutex_t  *mptr)

参数说明:

mptr:指向互斥锁的指针。
该函数接受一个指向互斥锁的指针作为参数并将其锁定。如果互斥锁已经被锁定,调用者将进入睡眠状态。函数返回时,将唤醒调用者。
如果互斥锁是静态分配的,就将mptr初始化为常值PTHREAD_MUTEX_INITIALIZER。 

  锁定成功返回0,否则返回错误码。

pthread_mutex_unlock(pthread_mutex_t  *mptr);

  用于互斥锁解锁操作。成功返回0,否则返回错误码。

示例代码:

#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
int myglobal;
pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
void *thread_function(void *arg) {
    int i, j;
    for (i = 0; i < 5; i++) {
        pthread_mutex_lock(&mymutex);
        j = myglobal;
        j = j + 1;
        printf(".");
        fflush(stdout);
        sleep(1);
        myglobal = j;
        pthread_mutex_unlock(&mymutex);
    }
    return NULL;
}
int main(void) {
    pthread_t mythread;
    int i;
    if (pthread_create(&mythread, NULL, thread_function, NULL)) {
        printf("error creating thread.");
        abort();
    }
    for (i = 0; i < 5; i++) {
        pthread_mutex_lock(&mymutex);
        myglobal = myglobal + 1;
        pthread_mutex_unlock(&mymutex);
        printf("o");
        fflush(stdout);
        sleep(1);
    }
    if (pthread_join(mythread, NULL)) {
        printf("error joining thread.");
        abort();
    }
    printf("\nmyglobal equals %d\n", myglobal);
    exit(0);
}

运行效果

线程私有数据

  在多线程环境里,应避免使用静态变量。在 Linux 系统中提供 了线程特定数据(TSD)来取代静态变量。它类似于全局变量,但是,是各个线程私有的, 它以线程为界限。TSD 是定义线程私有数据的惟一方法。同一进程中的所有线程,它们的同 一特定数据项都由一个进程内惟一的关键字 KEY 来标志。用这个关键字,线程可以存取线程私有数据。 在线程特定数据中通常使用四个函数。

  • pthread_key_create
#include <pthread.h> 
int pthread_key_create(pthread_key_t *key, void (* destructor)(void *value)); 

  pthread_key_create 函数在进程内部分配一个标志 TSD 的关键字。
  参数 key 指向创建的关 键字,该关键字对于一个进程中的所有线程是惟一的。所以在创建 key 时,每个进程只能调 用一次创建函数 pthread_key_create。在 key 创建之前,所有线程的关键字值是 NULL。一旦 关键字被建立,每个线程可以为该关键字绑定一个值。这个绑定的值对于线程是惟一的,每 个线程独立维护。
   参数 destructor 是一个可选的析构函数,可以和每个关键字联系起来。如果一个关键字 的 destructor 函数不为空,且线程为该关键字绑定了一个非空值,那么在线程退出时,析构函 数将会被调用。对于所有关键字的析构函数,执行顺序是不能指定的。
  该函数正常执行后返回值为 0,否则返回错误码。

  • pthread_once
#include <pthread.h> 
int pthread_once(pthread_once_t *once, void (*init) (void)); 

  pthread_once 函数使用 once 参数所指的变量,保证每个进程只调用一次 init 函数。通常 once 参数取常量 PTHREAD_ONCE_INIT,它保证每个进程只调用一次 init 函数。
  该函数正常执行后返回值为 0,否则返回错误码。

  • pthread_setspecific
#include <pthread.h> 
int pthread_setspecific(pthread_key_t key, const void *value);

  pthread_setspecific 函数为 TSD 关键字绑定一个与本线程相关的值。
  参数 key 是 TSD 关 键字。
  参数 value 是与本线程相关的值。value 通常指向动态分配的内存区域。
  该函数正常执行后返回值为 0,否则返回错误码。

  • pthread_getspecific
#include <pthread.h> 
void * pthread_getspecific(pthread_key_t key); 

  pthread_getspecific 函数获取与调用线程相关的 TSD 关键字所绑定的值。
  参数 key 是 TSD 关键字。
  该函数正常执行后返回与调用线程相关的 TSD 关键字所绑定的值。否则返回 NULL。

  线程安全性代码示例:

#include <stdio.h>
#include <pthread.h>
pthread_key_t   key;
void echomsg(int t)
{
    printf("destructor excuted in thread %d,param=%d\n", pthread_self(), t);
}
void * child1(void *arg)
{
    int tid = pthread_self();
    printf("thread1 %d enter\n", tid);
    pthread_setspecific(key, (void *)tid);
    sleep(2);
    printf("thread1 %d key’s %d\n", tid, pthread_getspecific(key));
    sleep(5);
}
void * child2(void *arg)
{
    int tid = pthread_self();
    printf("thread2 %d enter\n", tid);
    pthread_setspecific(key, (void *)tid);
    sleep(1);
    printf("thread2 %d key’s %d\n", tid, pthread_getspecific(key));
    sleep(5);
}
int main(void)
{
    pthread_t tid1, tid2;

    printf("hello\n");
    pthread_key_create(&key, (void *)echomsg);
    pthread_create(&tid1, NULL, child1, NULL);
    pthread_create(&tid2, NULL, child2, NULL);
    sleep(10);
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    pthread_key_delete(key);
    printf("main thread exit\n");
    return 0;
}

运行结果

小结

  使用多线程实现并发服务器的优点是线程的开销小,切换容易。但是由于线程共享相同 的内存区域,所以在对共享数据的进行操作时,要注意同步问题。其中线程特定数据虽然实现起来比较烦琐,但是它是将一个非线程安全 函数转换成线程安全函数的常用方法。
  除此之外,还可以通过改变调用函数参变量的方式实现线程的安全性,这里不作介绍。
  下一篇文章将介绍另外一种实现并发服务器的方法:I/O 多路复用。

2019-03-23 16:32:33 qq_40861091 阅读数 134
  • C++多线程编程视频教程(C++11多线程并发

    线程与进程相比,它是一种花销小,切换快,更节俭的多任务的操作方式。多编程并发在企业中开发显得尤为重要,本课程包含Windows多线程编程与C++11高并发编程,通过浅显易懂的代码与讲解,让你的多线程编程能力得到质的飞跃,具备开发高并发代码的能力!

    3109 人正在学习 去看看 黄强

最近从windows转到linux学习了一段时间了。其实tcp服务器本质基本都一样。

这次上传一个简单的linux下多线程并发服务器。该服务器可以接收128个客户端同时连接,并且加入了互斥锁,排除了多个客户端同时间连接时出现的问题。并且当前一个客户端关闭连接时,后面连接的客户端可以继续使用该线程。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#define maxthread 256
typedef struct SockInfo
{
	int sockfd;
	struct sockaddr_in addr;
	pthread_t id;
}SockInfo; //定义结构体,来存储多客户端的线程编号

pthread_mutex_t mutex;	// 定义互斥锁,全局变量
/************************************************************************
函数名称:	void *client_process(void *arg)
函数功能:	线程函数,处理客户信息
函数参数:	已连接套接字
函数返回:	无
************************************************************************/
void *client_process(void *arg)
{
int recv_len = 0;
char recv_buf[1024] = "";	// 接收缓冲区
int connfd = *(int *)arg; // 传过来的已连接套接字
// 解锁,pthread_mutex_lock()唤醒,不阻塞
pthread_mutex_unlock(&mutex);
// 接收数据
while((recv_len = recv(connfd, recv_buf, sizeof(recv_buf), 0)) > 0)
{
//printf("recv_buf: %s\n", recv_buf); // 打印数据
printf(" 来自fd描述符:%d,内容为: %s\n",connfd, recv_buf); // 打印数据
send(connfd, recv_buf, recv_len, 0); // 给客户端回数据
memset(&recv_buf, 0, recv_len);
}
printf("fd:%d 关闭!\n",connfd);
close(connfd);	//关闭已连接套接字
return NULL;
}
//===============================================================
// 语法格式:	void main(void)
// 实现功能:	主函数,建立一个TCP并发服务器
// 入口参数:	无
// 出口参数:	无
//===============================================================
int main(int argc, char *argv[])
{
int sockfd = 0;	// 套接字
int connfd = 0;
int err_log = 0;
struct sockaddr_in my_addr;	// 服务器地址结构体
unsigned short port =8888 ; // 监听端口
pthread_t thread_id;
pthread_mutex_init(&mutex, NULL); // 初始化互斥锁,互斥锁默认是打开的
printf("TCP Server Started at port %d!\n", port);
sockfd = socket(AF_INET, SOCK_STREAM, 0); // 创建TCP套接字
if(sockfd < 0)
{
perror("socket error");
exit(-1);
}
bzero(&my_addr, sizeof(my_addr));	// 初始化服务器地址
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(port);
my_addr.sin_addr.s_addr = htonl(INADDR_ANY);
printf("Binding server to port %d\n", port);

int flag=1;//设置端口复用,这样如果是服务器主动断开连接后可以马上使用,略过time_wait阶段的等待
setsockopt(sockfd,SOL_SOCKET,SO_REUSEPORT,&flag,sizeof(flag));

// 绑定
err_log = bind(sockfd, (struct sockaddr*)&my_addr, sizeof(my_addr));
if(err_log != 0)
{
perror("bind");
close(sockfd);
exit(-1);
}
// 监听,套接字变被动
err_log = listen(sockfd,4);
if( err_log != 0)
{
perror("listen");
close(sockfd);
exit(-1);
}
printf("Waiting client...\n");
int i=0;//线程的标志
SockInfo info[maxthread];
for(int i=0;i<maxthread;++i)
{
 info[i].sockfd=-1;
}
socklen_t cli_len=sizeof(struct sockaddr_in);

while(1)
{
char cli_ip[INET_ADDRSTRLEN] = "";	// 用于保存客户端IP地址
for(i=0;i<maxthread;++i)
{
if(info[i].sockfd==-1)//遍历线程,找到最小的空闲描述符,i为其结构体标志
{
break;
}
}
if(i==maxthread)break;//如果线程已经满了,不接受连接退出循环
// 上锁,在没有解锁之前,pthread_mutex_lock()会阻塞
pthread_mutex_lock(&mutex);
//获得一个已经建立的连接
info[i].sockfd = accept(sockfd, (struct sockaddr*)&info[i].addr, &cli_len);
// 打印客户端的 ip 和端口
inet_ntop(AF_INET, &info[i].addr.sin_addr, cli_ip, INET_ADDRSTRLEN);
printf("--------client ip=%s,port=%d\n", cli_ip,ntohs(info[i].addr.sin_port));
if(info[i].sockfd > 0)
{
//给回调函数传的参数,&connfd,地址传递
pthread_create(&info[i].id, NULL, client_process, (void *)&info[i]); //创建线程
pthread_detach(info[i].id); // 线程分离,结束时自动回收资源
}
}
close(sockfd);
return 0;
}
2018-04-18 21:03:31 leikun153 阅读数 264
  • C++多线程编程视频教程(C++11多线程并发

    线程与进程相比,它是一种花销小,切换快,更节俭的多任务的操作方式。多编程并发在企业中开发显得尤为重要,本课程包含Windows多线程编程与C++11高并发编程,通过浅显易懂的代码与讲解,让你的多线程编程能力得到质的飞跃,具备开发高并发代码的能力!

    3109 人正在学习 去看看 黄强

一:服务器模型一般分为两种

1:循环服务器:服务器同一时刻只能响应一个客户端的请求

2:并发服务器:服务器在同一时刻可以响应多个客户端的请求

二:并发服务器的三中实现方式

1:多进程并发服务器

是指TCP连接后,每一个客户机的请求并不由服务器直接处理,而是由服务器创建一个子进程来处理

2:多线程并发服务器

多线程服务器是对多进程服务器的改进,由于多进程服务器在创建进程时要消耗较大的系统资源,所以用线程来取代进程,这样的服务处理程序可以较快的创建。据统计,创建线程要比创建进程要快100~1000倍。线程与进程不同的是:一个进程内的所有线程共享相同的全局内存,全局变量等信息。

是指TCP连接后,每一个客户机的请求并不由服务器直接处理,而是由服务器创建一个子线程来处理

3:多路复用I/O

I/O是为了解决线程/进程阻塞在哪个i/o调用中,常用select或者pool。

三:多线程服务器

1:在多进程的编制模型中,各进程拥有独立的地址空间,减少了出错的概率,然而,fork调用却存在一些开销大,通信机制困难的问题

2:使用多线程服务器的的优点是线程的开销小,切换容易。但是由于共享相同的内存区域,所以在对共享数据进行操作时,要注意同步关系,即线程运行要区分先后。其中,线程特定的数据虽然实现起来比较麻烦,但是它是将一个非线程安全函数转换成安全函数的常用方法

3:多线程可以解决多进程的问题,( 开销大,进程间通信困难(IPC机制) ),有时线程也称之为轻量级的进程,线程的创建可能比进程快10~100倍。通一个进程内的线所有线程共享相同的全局内存,这使得线程之间易于共享信息,但随之而来的是线程的安全问题。

4:在某些情况下,我们需要多个线程共享全局数据,在访问这些数据时,就需要用到同步锁机制。



















2017-02-28 20:38:43 Apollon_krj 阅读数 2602
  • C++多线程编程视频教程(C++11多线程并发

    线程与进程相比,它是一种花销小,切换快,更节俭的多任务的操作方式。多编程并发在企业中开发显得尤为重要,本课程包含Windows多线程编程与C++11高并发编程,通过浅显易懂的代码与讲解,让你的多线程编程能力得到质的飞跃,具备开发高并发代码的能力!

    3109 人正在学习 去看看 黄强

同类文章:
基于Linux的SOCKET编程之TCP半双工Client-Server聊天程序
基于Linux的Socket编程之TCP全双工Server-Client聊天程序
高并发服务器编程之多进程并发服务器

一、多线程服务器分析:

多进程并发与多线程并发实现过程差不多,只是多线程的同步、资源回收与多进程还是有很多区别的。多进程不需要记录子进程的信息,而多线程需要记录。
或许需要将子线程设置为分离态(pthread_detach()的效率,不如在创建线程之前就设置好attr属性高)。当然我们也可以在主线程中创建一个线程专门用来回收用于通信的线程,这点和多进程基本相同,而我们采用创建线程之前就设置好attr属性为分离状态的方法回收结束的子线程。

在多进程中说过C/S心跳机制,多线程也可以使用,但是需要注意:本地127.0.0.1测试时,不会经过网卡,在经过过滤器时便直接接到目的端口。所以说单机无法测试C/S心跳机制。

二、多线程服务器测试代码:

multithread.h:

/*multithread.h*/
#ifndef _MULTI_THREAD_H_
#define _MULTI_THREAD_H_

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<netinet/in.h>
#include<arpa/inet.h>

#define BUF_SIZE 1024

#endif

1、server服务器端:

(1)、server.c:

/*server.c*/
#include<multithread.h>
#include<server.h>

void sys_err(const char * ptr_err)
{
    perror(ptr_err);
    exit(EXIT_FAILURE);
}

void socket_server_create(const char * ipaddr, const char * port)
{
    struct sockaddr_in seraddr;
    int listenfd, ret;

    if((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
        sys_err("socket create");

    seraddr.sin_family = AF_INET;
    seraddr.sin_addr.s_addr = inet_addr(ipaddr);
    seraddr.sin_port = htons(atoi(port));

    if( (ret = bind(listenfd, (struct sockaddr*)&seraddr, sizeof(seraddr))) < 0)
        sys_err("bind server address");

    if((ret = listen(listenfd, BACKLOG_THREAD)) < 0)
        sys_err("listen");

    accept_conn(listenfd);/*处理客户端链接的接收工作*/
    close(listenfd);
}

void accept_conn(const int listenfd)
{
    int connfd, i = 0;
    pthread_t tid;
    socklen_t addrlen;
    pthread_attr_t attr;//线程状态信息
    struct thread_info thread[MAX_THREAD_NUM];//存储线程信息,用于函数指针传参
    struct sockaddr_in cliaddr;//客户端信息

    pthread_attr_init(&attr);/*初始化线程属性*/
    /*设置为分离状态,也可以在线程处理函数中,用pthread_detach()进行分离状态设置,但是效率稍差*/
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    while(1){
        addrlen = sizeof(cliaddr);
again:  
        if((connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &addrlen)) < 0){
            if(errno == EINTR || errno == ECONNABORTED) 
                goto again;
            else
                sys_err("accept");
        }
        /*保存连接到的客户端信息到结构体中以便传参*/
        thread[i].connfd = connfd;
        thread[i].cliaddr = cliaddr;
        /*创建线程并且传递接收到的cilent信息到线程处理函数*/
        pthread_create(&tid, &attr, deal_conn, (void *)&thread[i]);
        pthread_attr_destroy(&attr);//线程资源使用完后释放资源

        i++;
        if(i == MAX_THREAD_NUM){//超过线程总数限制,服务器主线程结束
            printf("Too many connect!\n");
            pthread_exit((void *)(-1));
        }
    }
}
void * deal_conn(void *arg)
{
    struct thread_info * conn = (struct thread_info *)(arg);//无类型指针转换为自定义客户端信息类型指针
    int ret;
    char recvbuf[BUF_SIZE] = {};//接受缓冲区
    char sendbuf[BUF_SIZE] = {};//回射缓冲区
    char ipaddr[IPADDR_SIZE] = {};
    int port = ntohs((conn->cliaddr).sin_port);//客户端端口
    strcpy(ipaddr, inet_ntoa((conn->cliaddr).sin_addr));//客户端IP地址

    int fd = open("sersock.log", O_RDWR);
    lseek(fd, 0, SEEK_END);
    sprintf(recvbuf, "%s:%d\n", ipaddr, port);
    if( (ret = write(fd, recvbuf, strlen(recvbuf))) <0 )//将客户端登录状态写入日志文件
        sys_err("write sersock.log");
    close(fd);

    printf("%s:%d is connect success!\n", ipaddr, port);
    while(1){
        bzero(recvbuf, strlen(recvbuf));
        bzero(sendbuf, strlen(sendbuf));

        if((ret = read(conn->connfd, recvbuf, sizeof(recvbuf))) == 0){/*读取客户端发送信息*/
            printf("The client %s:%d is over!\n", ipaddr, port);
            break;
        }
        sprintf(sendbuf, "Server recv messge:%s", recvbuf);//拼接一部分回射提示信息
        if( (ret = write(conn->connfd, sendbuf, strlen(sendbuf))) < 0)/*回射*/
            sys_err("write");
    }
    close(conn->connfd);
    pthread_exit(NULL);/*不需要返回值*/
}

(2)、server.h:

/*server.h*/
#ifndef _SERVER_H_
#define _SERVER_H_

#include<multithread.h>
#include<pthread.h>
#include<fcntl.h>
#include<errno.h>

#define BACKLOG_THREAD 32
#define MAX_THREAD_NUM 256
#define IPADDR_SIZE 15

struct thread_info{
    struct sockaddr_in cliaddr;
    int connfd;
};

void socket_server_create(const char *, const char *);
void accept_conn(const int);
void * deal_conn(void *);

#endif

(3)、server_main.c:

/*server_main.c*/
#include<multithread.h>
#include<server.h>

int main(int argc, char **argv)
{
    if(argc < 3 ){
        printf("Too few parameter!\n");
        exit(EXIT_FAILURE);
    }

    socket_server_create(argv[1], argv[2]);
    return 0;
}

(4)、Makefile文件:

CPPFLAGS= -I ../ -I ./
CFLAGS= -g -Wall
#指定线程函数链接库
LDFLAGS= -lpthread
CC=gcc

src = $(wildcard *.c)
obj = $(patsubst %.c,%.o,$(src))
target = server

$(target):$(obj)
    $(CC) $^ $(LDFLAGS) -o $@

%.o:%.c
    $(CC) -c $< $(CFLAGS) $(CPPFLAGS) -o $@

.PHONY:clean
clean:
    -rm -f server
    -rm -f *.o

2、client客户端:

(1)、client.c:

/*client.c*/
#include<multithread.h>
#include<client.h>

void sys_err(const char * ptr_err)
{
    perror(ptr_err);
    exit(EXIT_FAILURE);
}
void socket_client_create(const char * ipaddr, const char * port)
{
    struct sockaddr_in serveraddr;

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = inet_addr(ipaddr);
    serveraddr.sin_port = htons(atoi(port));

    int ret = connect(sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
    if(ret < 0)
        sys_err("connect");

    deal_connect(sockfd);

    close(sockfd);
}

void deal_connect(const int sockfd)
{
    char sendbuf[BUF_SIZE] = {};
    char recvbuf[BUF_SIZE] = {};
    int ret;
    while( fgets(sendbuf, sizeof(sendbuf), stdin) ){
        if((ret = write(sockfd, sendbuf, strlen(sendbuf))) < 0)/*发送信息*/
            sys_err("write sockfd");
        if((ret = read(sockfd, recvbuf, sizeof(recvbuf))) < 0)/*接收回射信息*/
            sys_err("read");
        if((ret = write(STDOUT_FILENO, recvbuf, ret)) < 0)/*写到stdout*/
            sys_err("write stdout");
        bzero(sendbuf, strlen(sendbuf));
        bzero(recvbuf, strlen(recvbuf));
    }
}

(2)、client.h:

/*client.h*/
#ifndef _CLIENT_H_
#define _CLIENT_H_

#include<multithread.h>

void socket_client_create(const char *, const char *);
void deal_connect(const int);

#endif

(3)、client_main.c:

/*client_main.c*/
#include<multithread.h>
#include<client.h>

int main(int argc, char ** argv)
{
    if(argc < 3){
        printf("Too few parameter!\n");
        exit(EXIT_FAILURE);
    }
    socket_client_create(argv[1], argv[2]);
    return 0;
}

(4)、Makefile文件:

CPPFLAGS= -I ../ -I ./
CFLAGS= -g -Wall
LDFLAGS= 
CC=gcc

src = $(wildcard *.c)
obj = $(patsubst %.c,%.o,$(src))
target = client

$(target):$(obj)
    $(CC) $^ $(LDFLAGS) -o $@

%.o:%.c
    $(CC) -c $< $(CFLAGS) $(CPPFLAGS) -o $@

.PHONY:clean
clean:
    -rm -f client
    -rm -f *.o

缺点:未处理线程的同步问题(对日志文件的加锁)。

三、测试结果截图分析:

1、测试客户端登录与新信息接收功能,以及查看日志文件的记录信息:
这里写图片描述

2、查看线程、进程间关系,以及进程的tcp连接状态:
这里写图片描述

没有更多推荐了,返回首页