精华内容
下载资源
问答
  • 基于C语言的线程通信消息队列实现

    千次阅读 2019-01-09 22:46:20
    多线程编程中经常需要进行线程与线程间的通信,由于线程间能够共享数据结构,也就是一个全局变量能够被两个线程同时候使用。但是要注意的是线程的同步和互斥。 线程同步是指线程之间所具有的一种制约关系,一个...

             在多线程编程中经常需要进行线程与线程间的通信,由于线程间能够共享数据结构,也就是一个全局变量能够被两个线程同时候使用。但是要注意的是线程的同步互斥
             线程同步是指线程之间所具有的一种制约关系,一个线程的执行依赖另一个线程的消息,当它没有得到另一个线程的消息时应等待,直到消息到达时才被唤醒。
             线程互斥是指当有若干个线程都要使用某一共享资源时,任何时刻最多只允许一个线程去使用,其它要使用该资源的线程必须等待,直到占用资源者释放该资源。一般采用互斥锁来解决互斥的问题,但是使用时一定要注意避免死锁。

             在接收回调数据的时候,不能进行太过耗时的处理,通常将数据拷贝至消息队列,在其他线程进行处理。如下为项目总结的消息队列和使用示例:

    msg_process.c

    #include <stdlib.h>
    #include <stdio.h>
    #include <string.h>
    #include <sys/time.h>
    #include "msg_process.h"
     
    static unsigned long long timeout_ns = 0;
    
    #define LOG_ERR(fmt, ...) do{\
    	printf("[ERROR]  "fmt"  [line:%d] [%s]\n", ##__VA_ARGS__, __LINE__, __FUNCTION__);\
    }while(0);
     
    #define LOG_WARN(fmt, ...) do{\
    	printf("[WARNING]  "fmt"  [line:%d] [%s]\n", ##__VA_ARGS__, __LINE__, __FUNCTION__);\
    }while(0); 
     
    /**
     * 消息处理模块内部接口
     */
    static void put_msg_to_buffer(tmsg_buffer* buf, tmsg_element* elm){
    	if (NULL == buf || NULL == elm) {
    		LOG_ERR("buf or elm is NULL");
    		return;
    	}
     
    	if (NULL != elm->next) {
    		elm->next = NULL;
    	}
     
    	pthread_mutex_lock(&buf->mutex);
    	//缓冲区尚无消息节点
    	if (buf->first == buf->last
    			&& 0 == buf->num) {
    		buf->first = elm;
    		buf->last = elm;
    		buf->num ++;
    		//TODO:通知等待消息而阻塞的线程
    		pthread_cond_signal(&buf->not_empty);
    	} else {
    		//将新的消息节点信息添加到缓冲区的尾部
    		buf->last->next = elm;
    		buf->last = elm;
    		buf->num ++;
    	}
     
    	pthread_mutex_unlock(&buf->mutex);
    }
     
     
    static tmsg_element* get_msg_from_buffer(tmsg_buffer* buf, int block){
    	tmsg_element *elm = NULL;
     
    	if (NULL == buf) {
    		LOG_ERR("buf is NULL");
    		return NULL;
    	}
     
    	pthread_mutex_lock(&buf->mutex);
    	//缓冲区中无消息节点
    	while (0 == buf->num) {
    		//阻塞线程等待消息节点
    		pthread_cond_wait(&buf->not_empty, &buf->mutex);
    	}
    	//从缓冲区首部取出消息节点
    	elm = buf->first;
    	if (1 == buf->num) {
    		buf->first = buf->last = NULL;
    		buf->num = 0;
    	} else {
    		buf->first = buf->first->next;
    		buf->num --;
    	}
     
    	pthread_mutex_unlock(&buf->mutex);
     
    	return elm;
    }
     
     
     
    static tmsg_element* get_msg_from_buffer_timeout(tmsg_buffer* buf, int block/*ms*/){
    	tmsg_element *elm = NULL;
    //	struct timeval timenow;
    	struct timespec timeout;
     
    	if (NULL == buf) {
    		LOG_ERR("buf is NULL");
    		return NULL;
    	}
     
    	pthread_mutex_lock(&buf->mutex);
    	//缓冲区中无消息节点
    	if (0 == buf->num) {
    #if 1
    		clock_gettime(CLOCK_MONOTONIC, &timeout);
    		timeout.tv_sec = timeout.tv_sec + block/1000; //加上秒数
    		block %= 1000;	//得到毫秒数
    
    		timeout_ns = timeout.tv_nsec + block*1000*1000;
    		if( timeout_ns >= 1000*1000*1000 ) //若超过1s
    		{
    			timeout.tv_sec ++;
    			timeout.tv_nsec = timeout_ns - 1000*1000*1000;
    		}
    		else
    			timeout.tv_nsec = timeout_ns;
    #else	//解决系统时间改变导致消息队列阻塞的bug
    		gettimeofday(&timenow,NULL);
    		timeout.tv_sec = timenow.tv_sec + block/1000; //加上秒数
    		block %= 1000;	//得到毫秒数
    
    		timeout_ns = timenow.tv_usec*1000 + block*1000*1000;
    		if( timeout_ns >= 1000*1000*1000 ) //若超过1s
    		{
    			timeout.tv_sec ++;
    			timeout.tv_nsec = timeout_ns - 1000*1000*1000;
    		}
    		else
    			timeout.tv_nsec = timeout_ns;
    #endif
    		//带超时时间阻塞线程等待消息节点
    		pthread_cond_timedwait(&buf->not_empty, &buf->mutex, &timeout);
    	}
     
    	if (buf->num > 0) {
    		//从缓冲区首部取出消息节点
    		elm = buf->first;
    		if (1 == buf->num) {
    			buf->first = buf->last = NULL;
    			buf->num = 0;
    		} else {
    			buf->first = buf->first->next;
    			buf->num --;
    		}
    	}
     
    	pthread_mutex_unlock(&buf->mutex);
     
    	return elm;
    }
     
     
    static tmsg_element* clear_msg_buffer(tmsg_buffer* buf){
    	tmsg_element* elm = NULL;
    	tmsg_element* elm_tmp = NULL;
     
    	if (NULL == buf){
    		LOG_ERR("buf is NULL");
    		return NULL;
    	}
     
    	//清空buffer中当前消息节点之前的所有消息节点
    	pthread_mutex_lock(&buf->mutex);
    	if (buf->num > 0) {
    		elm = buf->first;
    		while(elm != NULL) {
    			//首尾指针指向同一消息节点
    			if (elm == buf->last) {
    				buf->first = buf->last;
    				if (buf->num != 1) {
    					buf->num = 1;
    				}
    				break;
    			}
     
    			elm_tmp = elm->next;
    			free_tmsg_element(elm);
    			buf->num --;
    			elm = elm_tmp;
    			buf->first = elm;
    		}
    	}
     
    	pthread_mutex_unlock(&buf->mutex);
     
    	return elm;
    }
     
     
    static void send_msg_to_buffer(tmsg_buffer* buf, int msg, int ext, char* str, int len)
    {
    	tmsg_element *elm = NULL;
     
    	elm = (tmsg_element *)malloc(sizeof(tmsg_element));
    	if (NULL == elm) {
    		LOG_ERR("new msg element failed!!");
    		return;
    	}
     
    	if(len > TMSG_MAX_LEN) //限制最大申请长度
    	{
    		len = TMSG_MAX_LEN;
    		LOG_WARN("Data is truncated,which must less than %d!",TMSG_MAX_LEN);
    	}
    	//填充消息节点数据
    	memset(elm, 0, sizeof(tmsg_element));
    	elm->msg = msg;
    	elm->ext = ext;
    	elm->dt = NULL;
    	elm->sub0 = 0;
    	elm->sub1 = 0;
    	elm->dt_len = len;
    	if (str) 
    	{
    		elm->dt = (char *)malloc(len);  //根据发送的大小申请内存
    		if(elm->dt == NULL)
    		{
    			LOG_ERR("new element->dt failed!!");
    			free_tmsg_element(elm);
    			return;
    		}	
    		else
    			memmove(elm->dt, str, len);
    	}
    	
    	elm->next = NULL;
    	//将消息节点添加到缓冲区中
    	put_msg_to_buffer(buf, elm);
    }
     
     
    static void send_msg_to_buffer_ex(tmsg_buffer* buf, int msg, int ext, int sub0, int sub1, char* str, int len){
    	tmsg_element *elm = NULL;
     
    	elm = (tmsg_element *)malloc(sizeof(tmsg_element));
    	if (NULL == elm) {
    		LOG_ERR("new msg element failed!!");
    		return;
    	}
    	
    	if(len > TMSG_MAX_LEN) //限制最大申请长度
    	{
    		len = TMSG_MAX_LEN;
    		LOG_WARN("Data is truncated,which must less than %d!",TMSG_MAX_LEN);
    	}
    	
    	//填充消息节点数据
    	memset(elm, 0, sizeof(tmsg_element));
    	elm->msg = msg;
    	elm->ext = ext;
    	elm->sub0 = sub0;
    	elm->sub1 = sub1;
    	elm->dt = NULL;
    	elm->dt_len = len;
    	if (str) 
    	{
    		elm->dt = (char *)malloc(len);  //根据发送的大小申请内存
    		if(elm->dt == NULL)
    		{
    			LOG_ERR("new element->dt failed!!");
    			free_tmsg_element(elm);
    			return;
    		}	
    		else
    			memmove(elm->dt, str, len);
    	}
    	elm->next = NULL;
    	//将消息节点添加到缓冲区中
    	put_msg_to_buffer(buf, elm);
    }
     
     
    static void dispose_msg_buffer(tmsg_buffer* buf){
    	tmsg_element* elm = NULL;
     
    	if (NULL == buf) {
    		return;
    	}
     
    	if (buf->first != buf->last
    			&& buf->num > 0) {
    		elm = clear_msg_buffer(buf);
    	} else {
    		elm = buf->last;
    	}
     
    	if (NULL != elm) {
    		free_tmsg_element(elm);
    		buf->first = buf->last = NULL;
    		buf->num = 0;
    	}
     
    	pthread_mutex_destroy(&buf->mutex);
    	pthread_cond_destroy(&buf->not_empty);
    	free(buf);
     
    	buf = NULL;
    }
     
     
    static int get_msg_num(tmsg_buffer* buf){
    	if (NULL == buf) {
    		return 0;
    	}
     
    	return buf->num;
    }
     
     
    /**
     * 以下为消息处理模块对外接口
     */
     
    /*消息缓冲区初始化*/
    tmsg_buffer* msg_buffer_init(void){
    	tmsg_buffer* msg_buffer = NULL;
    	pthread_condattr_t cattr;
     
    	msg_buffer = (tmsg_buffer *)malloc(sizeof(tmsg_buffer));
    	if (NULL == msg_buffer){
    		LOG_ERR("init msg buffer failed!!");
    		return NULL;
    	}
     
    	//初始化成员变量和函数
    	memset(msg_buffer, 0, sizeof(tmsg_buffer));
    	msg_buffer->first = NULL;
    	msg_buffer->last = NULL;
    	msg_buffer->num = 0;
     
    	pthread_mutex_init(&(msg_buffer->mutex), NULL);
    #if 1
    	pthread_condattr_init(&cattr);
    	pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
    	pthread_cond_init(&(msg_buffer->not_empty), &cattr);
    #else
    	pthread_cond_init(&(msg_buffer->not_empty), NULL);
    #endif
     
    	//继续绑定接口
    	msg_buffer->put = put_msg_to_buffer;
    	msg_buffer->get = get_msg_from_buffer;
    	msg_buffer->get_timeout = get_msg_from_buffer_timeout;
    	msg_buffer->clear = clear_msg_buffer;
    	msg_buffer->sendmsg = send_msg_to_buffer;
    	msg_buffer->sendmsgex = send_msg_to_buffer_ex;
    	msg_buffer->dispose = dispose_msg_buffer;
    	msg_buffer->getnum = get_msg_num;
     
    	return msg_buffer;
    }
     
     
    /*复制消息节点*/
    tmsg_element* dup_msg_element(tmsg_element* elm){
    	tmsg_element* msg_element = NULL;
    	if (NULL == elm) {
    		LOG_ERR("msg element is NULL!!");
    		return NULL;
    	}
     
    	msg_element = (tmsg_element *)malloc(sizeof(tmsg_element));
    	if (NULL == msg_element) {
    		LOG_ERR("create msg element is failed!!");
    		return NULL;
    	}
     
    	memcpy(msg_element, elm, sizeof(tmsg_element));
     
    	return msg_element;
    }
     
    void free_tmsg_element(tmsg_element *msg_element)
    {
    	if(msg_element != NULL)
    	{
    		if(msg_element->dt != NULL)
    		{
    			free(msg_element->dt);
    			msg_element->dt = NULL;
    		}
    		free(msg_element);
    		msg_element = NULL;
    	}
    }
     
    

    msg_process.h

    #ifndef _MSG_PROCESS_H_
    #define _MSG_PROCESS_H_
    #include <pthread.h>
    
    
    #define TMSG_MAX_LEN 4096 //最大限制为4K
    
    typedef struct msg_element tmsg_element;
     
    struct msg_element
    {
        tmsg_element* next;
        int msg;
        int ext;
        int sub0;
        int sub1;
    	int dt_len;
        char *dt;
    };
      
    typedef struct msg_buffer tmsg_buffer;
    
    struct msg_buffer
    {
        tmsg_element* first;
        tmsg_element* last;
        int             num;
     
        pthread_mutex_t mutex;
        pthread_cond_t  not_empty;
     
        void (*put)(tmsg_buffer* buf, tmsg_element* elm);
        tmsg_element* (*get)(tmsg_buffer* buf, int block);
        tmsg_element* (*get_timeout)(tmsg_buffer* buf, int block);
     
        tmsg_element* (*clear)(tmsg_buffer* buf);
        void (*sendmsg)(tmsg_buffer* buf, int msg, int ext, char* str, int len);
        void (*sendmsgex)(tmsg_buffer* buf, int msg, int ext, int sub0, int sub1, char* str, int len);
        void (*dispose)(tmsg_buffer* buf);
     
        int (*getnum)(tmsg_buffer* buf) ;
    };
     
     
     
    /*消息缓冲区初始化*/
    tmsg_buffer* msg_buffer_init(void);
     
     
    /*复制消息节点*/
    tmsg_element* dup_msg_element(tmsg_element* elm);
     
    /*释放消息节点*/
    void free_tmsg_element(tmsg_element *msg_element);
     
     
    #endif /* MESSAGE_MSG_CENTER_H_ */
    

    example.c  //调用示例

    #include <stdio.h>
    #include <unistd.h>
    #include "msg_process.h"
     
    int main(int argc,char *argv[])
    {
     
    	tmsg_buffer* test_msg_buff = NULL;
    	test_msg_buff = msg_buffer_init();
    	char table[] = "{\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\",\"hello\","
    				   "\"world\",\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\""
    				   ",\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\""
    				   ",\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\""
    				   ",\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\",\"hello\",\"world\"}";
    	test_msg_buff->sendmsg(test_msg_buff,0,0,table,sizeof(table)); //发送数据
     
    	sleep(2);
     
    	tmsg_element* event = NULL;
    	event = test_msg_buff->get_timeout(test_msg_buff,1000);  //接收数据
    	if(event != NULL)
    	{
    		if(event->dt != NULL)
    		{
    			int i = 0;
    			printf("Recv:");
    			for(i=0; i<event->dt_len; i++)
    			{
    				printf("%c",event->dt[i]);
    			}
    			printf("\n");
    		}
    	}
    	free_tmsg_element(event);
    	
        return 0;
    }

    执行结果

    展开全文
  • 简单的使用消息队列多线程通信

    千次阅读 2013-06-22 11:05:32
    下面是简单的两个进程的通信,简称server.c 和 client.c server.c #include #include #include #include #include #include #include #include #include #include #include #include #include #...

    下面是简单的两个进程的通信,简称server.c   和 client.c

    server.c

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    #include <iostream>
    #include <string.h>
    #include <stdlib.h>
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/msg.h>
    #include <signal.h>
    #include <errno.h>
    #include <sys/wait.h>
    #define SIZE 256
    using namespace std;
    struct msg{
        long mtype;
        char text[SIZE];
    };
    pthread_t sendtid,recvtid;
    void* serverSend(void* arg)
    {
        int msgid =(int)arg;
        sendtid = pthread_self();//获取线程号
        char buf[SIZE];
        struct msg ms;
        while(1){
            memset(buf,0,sizeof(buf));
            cout<<"input <<<<<<< :";
            fgets(buf,SIZE,stdin);
            buf[strlen(buf)-1] ='\0';
            strcpy(ms.text,buf);
            ms.mtype = 1;          //发送类型为1    
            if( !strcmp(buf,"quit")){ //如果输入quit   break
         		 if(msgsnd(msgid,&ms,strlen(ms.text),0) < 0){
               		 perror("failed to send message");
              	  	 exit(-1);
           		 }
    		break;
    	} 
    
            if(msgsnd(msgid,&ms,strlen(ms.text),0) < 0){
                perror("failed to send message");
                exit(-1);
            }
        }
        pthread_cancel(recvtid);    //终止接收线程
        return ((void*)1);
    }
    void* clientRecv(void *arg)
    {
        int msgid =(int)arg;
        recvtid = pthread_self();//获取线程号
        struct msg ms;
        while(1){
            memset(&ms,0,sizeof(ms));
            if(msgrcv(msgid,&ms,sizeof(ms.text),2,0) < 0){
                perror("failed to get message");
    	    pthread_cancel(sendtid);  
                exit(-1);
            }	
    	if(!strcmp(ms.text,"quit")){           //如果收到quit   break
    		break;
    	}
           	printf("\routput------>%s\n",ms.text);
    	printf("input------<");
    	fflush(stdout);
        }
        pthread_cancel(sendtid);     //终止发送线程
        return ((void*)1);
    }
    int getMessQueueId()
    {
        key_t key;
        int msg_id;
        if((key = ftok(".",'a')) < 0){
            perror("failed to get key");
            exit(-1);
        }
        if( (msg_id = msgget(key,0666 | IPC_CREAT)) < 0){
            perror("failed to get msid");
            exit(-1);
        }
        return msg_id;
    }
    int main()
    {
        pthread_t tid1;
        pthread_t tid2;
        int msgid = getMessQueueId();
        int err;
        void *arg =(void *)msgid;
        err = pthread_create(&tid1,NULL,serverSend,arg);
        if(err != 0)
        {
            cout<<"error"<<endl;
            exit(-1);
        }
        err = pthread_create(&tid2,NULL,clientRecv,arg);
        err = pthread_join(tid1,NULL);    
        if(err != 0)
        {
            cout<<"error"<<endl;
            exit(-1);
        }
        err = pthread_join(tid2,NULL);
        if(err != 0)
        {
            cout<<"error"<<endl;
            exit(-1);
        }
        char tempbuf[20];
        sprintf(tempbuf,"ipcrm -q %d",msgid);    //删除消息标识符     
        system(tempbuf);
    
        return 0;
    }


    client.c

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    #include <iostream>
    #include <string.h>
    #include <stdlib.h>
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/msg.h>
    #include <signal.h>
    #include <errno.h>
    #include <sys/wait.h>
    #define SIZE 256
    using namespace std;
    struct msg{
        long mtype;
        char text[SIZE];
    };
    
    pthread_t sendtid,recvtid;
     void  killAnotherthread(pthread_t tid)
    {
        if(pthread_kill(tid,0) != ESRCH)           //kill发送线程
    	{
    		cout<<"thread exist"<<endl;
    		pthread_kill(tid,SIGQUIT);
    		cout<<"after kill"<<endl;
    	}
    }
    void* serverSend(void* arg)
    {
        int msgid =(int)arg;
        sendtid = pthread_self();         //获取线程号
        //fprintf(stdout,"msgid = %d\n",msgid);
        char buf[SIZE];
        struct msg ms;
        while(1){
            memset(buf,0,sizeof(buf));
    	cout<<"input <<<<<<< :";
            fgets(buf,SIZE,stdin);
            buf[strlen(buf)-1] ='\0';
            strcpy(ms.text,buf);
            ms.mtype = 2;          //发送类型为2
            if( !strcmp(buf,"quit")) {    //如果输入quit   break
    	  if(msgsnd(msgid,&ms,strlen(ms.text),0) < 0){
                  perror("failed to send message");
                  exit(-1);
           	   }
    	   break;
    	}
    
            if(msgsnd(msgid,&ms,strlen(ms.text),0) < 0){
                perror("failed to send message");
                exit(-1);
            }
        } 
    	
        killAnotherthread(recvtid);   //  kill  接收线程
        return ((void*)1);
    }
    void* clientRecv(void *arg)
    {
        int msgid =(int)arg;
        recvtid = pthread_self();//获取线程号
        //fprintf(stdout,"msgid = %d\n",msgid);
        struct msg ms;
        while(1){
            memset(&ms,0,sizeof(ms));
            if(msgrcv(msgid,&ms,sizeof(ms.text),1,0) < 0){
                perror("failed to get message");	    
                exit(-1);
            }
    	if( !strcmp(ms.text,"quit")){
    		break;
    	}
    	printf("\r output------>%s\n",ms.text);
    	printf("input------<");
    	fflush(stdout);
        }
         killAnotherthread(sendtid);     //kill发送线程
    	
        return ((void*)1);
    }
    
    
    int getMessQueueId()
    {
        key_t key;
        int msg_id;
        if((key = ftok(".",'a')) < 0){
            perror("failed to get key");
            exit(-1);
        }
        if( (msg_id = msgget(key,0666 | IPC_CREAT)) < 0){
            perror("failed to get msid");
            exit(-1);
        }
        return msg_id;
    }
    int main()
    {
        pthread_t tid1;
        pthread_t tid2;
        int msgid = getMessQueueId();
        int err;
        void *arg =(void *)msgid;
        err = pthread_create(&tid1,NULL,serverSend,arg);
        //fprintf(stdout,"tid1 = %d\n",tid1);
        if(err != 0)
        {
            cout<<"error"<<endl;
            exit(-1);
        }
        err = pthread_create(&tid2,NULL,clientRecv,arg);
        err = pthread_join(tid1,NULL);
        //fprintf(stdout,"err = %d\n",err);
        if(err != 0)
        {
            cout<<"error"<<endl;
            exit(-1);
        }
        err = pthread_join(tid2,NULL);
        if(err != 0)
        {
            cout<<"error"<<endl;
            exit(-1);
        }
        char tempbuf[20];
        sprintf(tempbuf,"ipcrm -q %d",msgid);         //执行shell命令写入到tempbuf当中
        system(tempbuf);
    
        return 0;
    }


    展开全文
  • 至于python的进程间通信,暂时没有发现有类似linux ipc的模块可供使用,但是可以通过unix域套接字实现进程间通信。 1、用threading模块实现多线程 &nbsp; &nbsp; 可以通过派生threadin...
        python中实现多线程可以通过threading类。线程间同步则可以用queue类。至于python的进程间通信,暂时没有发现有类似linux ipc的模块可供使用,但是可以通过unix域套接字实现进程间通信。

    1、用threading模块实现多线程

        可以通过派生threading.Thread类来实现一个可独立控制的线程实例。用户需要(也只能)重写Thread类的__init__和run函数来实现自己的线程。线程类实例化后可以通过使用start函数使线程运行,该函数会在新开启的线程中调用run函数。这里需要注意的是,该类中只有run函数是出在独立的线程中运行的,其他控制函数都是处在调用者线程中执行的。其他线程可以通过调用Thread实例的join函数来阻塞自己等待线程运行结束。Thread类中name属性用来存放线程的名字。在python中主线程是一个名为“main thread”的Thread实例。
    1.1、Thread类中的函数
        start():启动线程。只能调用一次,否则会报错。该函数启动一个单独的线程运行run函数。
        run():通过重写这个函数来实现用户函数在线程中的运行。也可以在类实例化的时候直接指定运行函数,那样的话可以不用重写这个函数,
        join(timeout=None):调用这个函数的线程收到阻塞直到线程实例退出。timeout,一个浮点数,可以指定超时时间,单位秒。
        name存放线程的名字。可以更改,默认为Thread-n的格式。
       ident:存放线程标识,一个整数。在线程结束后依然存在(直到线程资源被回收)
       is_alive()判断线程是否存活。返回True或者False。调用模块函数 enumerate()可以得到所以存活的线程。
      daemon指定线程是否为守护线程

    1.2、线程实例
    1. import threading
    2. import Queue
    3. import time
    4. class mn_access(threading.Thread):
    5. def __init__(self,interval):
    6. threading.Thread.__init__(self)
    7. self.interval=interval
    8. self.thread_stop=False
    9. def run(self):#这个函数中存放用户自己的功能代码
    10. i=1;
    11. while not self.thread_stop:
    12. print("thread%d %s: i am alive hehe %d" %(self.ident,self.name,i))
    13. time.sleep(self.interval)
    14. i=i+1
    15. def stop(self):
    16. self.thread_stop = True
    17. if __name__ == "__main__":
    18. mn=mn_access(1)
    19. mn.start()#线程开始
    20. time.sleep(5)
    21. mn.stop()
    以上代码实现一个简单的线程实例,这段代码会生成一个线程并且周期性打印以证明自己存活。程序运行后打印:
    thread140109270083328 Thread-1: i am alive hehe 1
    thread140109270083328 Thread-1: i am alive hehe 2
    thread140109270083328 Thread-1: i am alive hehe 3
    thread140109270083328 Thread-1: i am alive hehe 4
    thread140109270083328 Thread-1: i am alive hehe 5

    2、Queue模块:线程间的消息队列


        queue模块用来实现消息队列功能,可以实现线程间安全的消息交换。各个线程可以通过调用消息队列实例对消息队列进行操纵。
    2.1、queue对象
        queue对象实现一个fifo队列(其他的还有lifo、priority队列,这里不再介绍)。queue只有qsize一个构造参数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:
        Queue.qsize():返回消息队列的当前空间。返回的值不一定可靠。
        Queue.empty():判断消息队列是否为空,返回True或False。同样不可靠。
        Queue.full():类似上边,判断消息队列是否满
        Queue.put(itemblock=Truetimeout=None):往消息队列中存放消息。block可以控制是否阻塞,timeout指定阻塞时候的等待时间。如果不阻塞或者超市,会引起一个full exception。
        Queue.put_nowait(item):相当于put(item, False).
        Queue.get(block=Truetimeout=None):获取一个消息,其他同put。
        以下两个函数用来判断消息对应的任务是否完成。
        Queue.task_done():接受消息的线程通过调用这个函数来说明消息对应的任务已完成。
        Queue.join():调用线程阻塞直到所有消息对应的任务已经完成。
        消息队列实例中维护的有待完成任务变量。每接收到一个消息该值自增一次。每调用一次.task_done()可以使该值减1,当待完成任务值为0的时候,join函数才会返回。
    2.2、 queue例子:
    1. import threading
    2. import Queue
    3. import time
    4. class worker(threading.Thread):
    5. def __init__(self,queue):
    6. threading.Thread.__init__(self)
    7. self.queue=queue
    8. self.thread_stop=False
    9. def run(self):
    10. while not self.thread_stop:
    11. print("thread%d %s: waiting for tast" %(self.ident,self.name))
    12. try:
    13. task=q.get(block=True, timeout=20)#接收消息
    14. except Queue.Empty:
    15. print("Nothing to do!i will go home!")
    16. self.thread_stop=True
    17. break
    18. print("task recv:%s ,task No:%d" % (task[0],task[1]))
    19. print("i am working")
    20. time.sleep(3)
    21. print("work finished!")
    22. q.task_done()#完成一个任务
    23. res=q.qsize()#判断消息队列大小
    24. if res>0:
    25. print("fuck!There are still %d tasks to do" % (res))
    26. def stop(self):
    27. self.thread_stop = True
    28. if __name__ == "__main__":
    29. q=Queue.Queue(3)
    30. worker=worker(q)
    31. worker.start()
    32. q.put(["produce one cup!",1], block=True, timeout=None)#产生任务消息
    33. q.put(["produce one desk!",2], block=True, timeout=None)
    34. q.put(["produce one apple!",3], block=True, timeout=None)
    35. q.put(["produce one banana!",4], block=True, timeout=None)
    36. q.put(["produce one bag!",5], block=True, timeout=None)
    37. print("***************leader:wait for finish!")
    38. q.join()#等待所有任务完成
    39. print("***************leader:all task finished!")

        以上程序实现一个简单的消费者-生产者模型。主线程通过消息队列向worker线程分配任务,worker线程完成任务后调用
    task_done()使得待完成任务数减一。主线程通过join等待生产者完成任务,生产者完成所有任务后主线程退出。代码输出:
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one cup! ,task No:1
    i am working
    work finished!
    fuck!There are still 3 tasks to do
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one desk! ,task No:2
    i am workingleader:wait for finish!
    work finished!
    fuck!There are still 3 tasks to do
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one apple! ,task No:3
    i am working
    work finished!
    fuck!There are still 2 tasks to do
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one banana! ,task No:4
    i am working
    work finished!
    fuck!There are still 1 tasks to do
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one bag! ,task No:5
    i am working
    work finished!
    thread139958685849344 Thread-1: waiting for tast 1
     ***************leader:all task finished!
    Nothing to do!i will go home!
    展开全文
  •  在linux多线程编程中,如果两个线程没用共同的数据区,则需要使用消息队列从一个线程往另一个线程发送消息(同样可以应用在进程间通信)  消息队列通过mq_open()创建和打开,此函数返回一个消息队列描述

    POSIX消息队列允许进程以消息的形式交换数据。此API与System V消息队列(msgget(2),msgsnd(2),msgrcv(2)等)有明显不同,但做的事情差不多。

    在linux多线程编程中,如果两个线程没用共同的数据区,则需要使用消息队列从一个线程往另一个线程发送消息(同样可以应用在进程间通信)

    消息队列通过mq_open()创建和打开,此函数返回一个消息队列描述符mqd_t,它用于之后的调用中引用打开的消息队列。每个消息队列由一个名字标识,两个进程可以操作同一个队列。

    消息通过调用mq_send()和mq_receive()传递。当一个进程结束使用该队列,则它调用mq_close(),当一个队列不再需要了,则可以调用mq_unlink()删除。队列属性可以调用mq_getattr()/mq_setattr()获取/修改。一个进程可以在一个空队列上调用mq_notify请求消息到达的异步通知。

    消息队列描述符引用到一个打开的消息队列(类比open(2))。fock(2)之后,子进程继承父进程队列描述符的拷贝,两个描述符都引用到父进程的那个描述符。两个进程持有的描述符共享与消息队列描述符相关联的标记(mq_flags)。

    下面以一个实例说明:

     创建并接收数据线程:

    #include<mqueue.h>
    #define WAIT_FOREVER -1
    
    typedef struct{
        UINT32 start;
        char name[32] = {0};  
    }MQ_SEND_MSG;                               //接收的消息结构体
    
    char nameMsg[32] = {0};
    UINT32 memAvail;        
    char chName;
    MQ_SEND_MSG recvMsg;
    extern mqd_t memAvailMsgQ;
    
    struct mq_addr mqa;
    mqa.mq_maxmsg = 20;
    mqa.mq_msgsize = sizeof(MQ_SEND_MSG);
    strcpy(nameMsg, "/memAvailMsgQ");
    mqUnlink(nameMsg);                                                                                                                                                              
    
    memAvailMsgQ = mq_open(nameMsg, O_CREAT|O_RDWR|O_EXCL, DEFAULT_MSG_MODE, &mqa);  //创建一个mq消息
    if(memAvailMsgQ == -1)
    {
        return error;    
    }                      
    
    FOREVER                                                 //循环等待另一个线程发送消息     
    {
        retval = mq_receive(memAvailMsgQ, (char *)&recvSendMsg, sizeof(MQ_SEND_MSG), WAIT_FOREVER, NULL);  
        if(retval == sizeof(MQ_SEND_MSG))                   //mqReceive返回接收到的数据的长度,以此判断有没有成功接收消息
        {
            memAvail = recvSendMsg.start;
            chName = recvSendMsg.name;
        }
    }   
    发送数据线程:

    #include<mqueue.h>
    
    typedef struct{
        UINT32 start;
        char name[32] = {0};  
    }MQ_SEND_MSG;                                //mq消息结构体
    
    mqd_t memAvailMsgQ;                         //mq消息句柄
    
    MQ_SEND_MSG mqSendMsg;
    mqSendMsg.start = memAvail;                     //发送的消息结构体成员
    strcpy(mqSendMsg.chName, "come on baby!");        //发送的消息结构体成员
    
    mq_send(memAvailMsgQ, (char *)&mqSendMsg, sizeof(MQ_SEND_MSG), 200,  0);   //发送数据


    展开全文
  • 在这部分,我们将使用ThreadPool 和MSMQ 进行消息收发。MSMQ 是一个分布式队列,通过MSMQ 一个应用程序可以异步地与另外一个应用程序通信。 在一个典型的场景中,我们要向维护一个队列的MSMQ 服务器发送消息,MSMQ...
  • 消息队列 多线程 linux

    千次阅读 2012-08-31 22:48:48
    这些天自己在学习消息队列使用,经过几天的琢磨,总算了解了怎么使用了,趁现在思路清晰把自己的过程记录下来供自己以后查阅。...这样,就实现了多线程之间的通信。 需要使用的头文件: #include #include
  • 不想在系统启动的时候注册b点的监听器,而且redis的订阅是阻塞式的,不想在web容器启动就使用多线程进行监听,请问还有什么其他的方式?目前想让a节点处理完成之后就通知b节点处理,b节点无需阻塞式的等待,系统内部...
  • 正所谓“师夷长技已治夷”,在我们使用C++来封装Linux下的多线程消息通信前,最好先学一下Windows中多线程是如何进行通信的。 Windows系统会为每个线程配备一个消息队列【Tips1】,其他线程others可以通过调用...
  • 一、为何要使用Netty作为高性能的通信库? 在看RocketMQ的RPC通信部分时候,可能有不少同学有这样子的疑问,RocketMQ为何要选择Netty而不直接使用JDK的NIO进行网络编程呢?这里有必要先来简要介绍下Netty。 Netty是...
  • 原先使用redhat 5.0下面写了一个smtp和POP3的程序,使用消息队列进行通信,pop3的那个程序在单进程的时候运行正常,但是做成多线程就会出现段错误直接退出,后面使用多进程勉强实现功能。后面由于某些原因,转战到...
  • 消息队列多线程 1,进程间通信: # 1.Queue的使用 # 步骤1:导入模块 from multiprocessing import Queue #步骤2:创建一个队列 q = Queue(3)#可接一个整数,表示队列的容量,如果省略则表示不设上限。 print...
  • 最近在多线程使用socket通信遇到的问题,场景是有多个线程需要用到同一个socket连接进行消息的发送,因为没有订具体的协议,所以对于发送(文本)消息,每次发送端发送1K字节,接收端每次接收1K字节,无效数据用0...
  • 最近在Hi3515上调试Qt与DVR程序,发现他们之间使用消息队列通信的,闲暇之余,就总结了一下消息队列,呵呵,自认为通俗易懂,同时,在应用中也发现了消息队列的强大之处。  关于线程的管理(互斥量和条件变量)见:...
  • 1. 消息队列 传送有格式的消息流 进程网状交叉通信,实现大规模数据通信 使用内核中的链表(实现...3、当往一个空队列放置一个消息时,posix消息队列允许产生一个信号或者启动一个线程,systemV消息队列则不提供类
  •  我的需求很简单,是个客户端连接到我的一个小型的数据转发服务器上,开始使用的是Socket通信实现这个功能,一旦数据服务器接收到来自不同客户端发来的消息,对这些消息进行处理(我这里是将数据接收到后再转发...
  • 进程网状交叉通信,实现大规模数据通信 使用内核中的链表(实现机制) posix的消息队列和系统V消息队列区别: 消息队列是随内核随机的 线程 什么是线程 线程是进程的一个实体,它是程序运行的最小单位 为什么要...
  • Python多线程通信

    2020-06-16 15:49:37
    Python多线程通信为什么需要通信通信方法共享变量共享变量介绍实现消息队列(Queue)实现通信消息队列介绍实现 为什么需要通信 在生产者和消费者的模型中,如果有一个生产者和多个消费者,消费者之间就需要线程节的...
  • Linux多线程编程与线程间通信机制

    千次阅读 2016-01-26 17:04:44
    Linux中多线程编程技术被广泛使用,这主要是因为多线程可以提升程序的运行效率和便利性。在现在的比较大一点的linux...本文就来讲讲linux中多线程编程的实现,以及利用消息队列进行线程间通信。   一、线程的创建
  • 线程间通信:由于多线程共享地址空间和数据空间,所以多个线程间的通信是一个线程的数据可以直接提供给其他线程使用,而不必通过操作系统。 所以线程间通信和同步的方式主要有锁、信号、信号量 进程间的通信则不同...
  • 多线程与并发-线程间通信 ...消息队列,socket编程等网络通信 wait/notify机制 基础 前提:多个线程使用用一把锁,在使用wait(),notify(),notifyAll()之前要先获取当前对象的锁。 wait():运行至wait处时...
  • 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的 创建队列的类(底层就是以管道和锁定的方式实现): Queue([maxsize]):创建共享的...
  • 一、进程间的通信 1.Queue的使用 步骤1:导入模块 from multiprocessing import Queue 步骤2:创建一个队列 q = Queue(3)#可接一个整数,表示队列的容量,如果省略则表示不设上限。 print(q.empty())#True ...
  • 管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用进程间的亲缘关系通常是指父子进程关系。 命名管道(named pipe/FIFO): 命名管道也是半双工的通信方式,但是它允许无亲缘关系...
  • Threadx 消息队列 queue

    2020-02-07 17:15:47
    每个线程可以创建消息队列,并且可以使用多消息队列线程通信消息队列不支持拥有者属性,也就是任何线程可以向消息队列发送或接收消息。应用开发者保证消息队列使用的合理性。 消息传递规则 1,任何...
  • rabbitMQ消息队列

    2019-03-17 17:38:23
    多线程使用queue实现生产者消费者模型, 同一个进程不同线程间通信 一个线程往队列中放数据 一个线程从队列中取数据 rabbitMQ介绍 what 本身就是一个队列, 只不过比较专业, 也叫作消息中间件, 消息队列 from rabbitMQ...
  • RabbitMQ消息队列

    2019-10-08 22:20:46
    ​ 在计算机科学中,消息队列是一种进程间通信或同一进程的不同线程间的通信方式,提供了异步的通信协议。 实现 消息队列常常保持在链表结构中,拥有权限的进程可以向消息队列中写入或读取消息。当前使用的...
  • 进程间的通信需要建立额外的通信机制(如:管道、消息队列、系统信号),而且不同进程间的切换还会触发CPU中断程序保存进程的CPU执行状态来切换到其他进程的执行,而后再切换回来时将从内存中恢复上一次的执行的CPU...
  • 线程间通信:由于多线程共享地址空间和数据空间,所以多个线程间的通信是一个线程的数据可以直接提供给其他线程使用,而不必通过操作系统(也就是内核的调度)。 进程的通信机制主要有:管道、有名管道、消息队列、...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 661
精华内容 264
关键字:

多线程使用消息队列通信