eslint监测react代码
2017-08-19 13:10:38 QH_JAVA 阅读数 773

原文链接:http://www.cnblogs.com/luxiaoxun/p/4331110.html


Scalable IO in Java

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

基本上所有的网络处理程序都有以下基本的处理过程:
Read request
Decode request
Process service
Encode reply
Send reply

Classic Service Designs

简单的代码实现:

复制代码
class Server implements Runnable {
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
            new Thread(new Handler(ss.accept())).start(); //创建新线程来handle
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }
    
    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }       
        private byte[] process(byte[] cmd) { /* ... */ }
    }
}
复制代码

对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。

这种模型由于IO在阻塞时会一直等待,因此在用户负载增加时,性能下降的非常快。

server导致阻塞的原因:

1、serversocket的accept方法,阻塞等待client连接,直到client连接成功。

2、线程从socket inputstream读入数据,会进入阻塞状态,直到全部数据读完。

3、线程向socket outputstream写入数据,会阻塞直到全部数据写完。

client导致阻塞的原因:

1、client建立连接时会阻塞,直到连接成功。

2、线程从socket输入流读入数据,如果没有足够数据读完会进入阻塞状态,直到有数据或者读到输入流末尾。

3、线程从socket输出流写入数据,直到输出所有数据。

4、socket.setsolinger()设置socket的延迟时间,当socket关闭时,会进入阻塞状态,直到全部数据都发送完或者超时。

改进:采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。

Basic Reactor Design

 代码实现:

复制代码
class Reactor implements Runnable { 
    final Selector selector;
    final ServerSocketChannel serverSocket;
    Reactor(int port) throws IOException { //Reactor初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false); //非阻塞
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //分步处理,第一步,接收accept事件
        sk.attach(new Acceptor()); //attach callback object, Acceptor
    }
    
    public void run() { 
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey)(it.next()); //Reactor负责dispatch收到的事件
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }
    
    void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment()); //调用之前注册的callback对象
        if (r != null)
            r.run();
    }
    
    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                new Handler(selector, c);
            }
            catch(IOException ex) { /* ... */ }
        }
    }
}

final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;
    
    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c; c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this); //将Handler作为callback对象
        sk.interestOps(SelectionKey.OP_READ); //第二步,接收Read事件
        sel.wakeup();
    }
    boolean inputIsComplete() { /* ... */ }
    boolean outputIsComplete() { /* ... */ }
    void process() { /* ... */ }
    
    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }
    
    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE); //第三步,接收write事件
        }
    }
    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel(); //write完就结束了, 关闭select key
    }
}

//上面 的实现用Handler来同时处理Read和Write事件, 所以里面出现状态判断
//我们可以用State-Object pattern来更优雅的实现
class Handler { // ...
    public void run() { // initial state is reader
        socket.read(input);
        if (inputIsComplete()) {
            process();
            sk.attach(new Sender());  //状态迁移, Read后变成write, 用Sender作为新的callback对象
            sk.interest(SelectionKey.OP_WRITE);
            sk.selector().wakeup();
        }
    }
    class Sender implements Runnable {
        public void run(){ // ...
            socket.write(output);
            if (outputIsComplete()) sk.cancel();
        }
    }
}
复制代码

这里用到了Reactor模式。

关于Reactor模式的一些概念:

Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理。

Handler:负责处理非阻塞的行为,标识系统管理的资源;同时将handler与事件绑定。

Reactor为单个线程,需要处理accept连接,同时发送请求到处理器中。

由于只有单个线程,所以处理器中的业务需要能够快速处理完。

改进:使用多线程处理业务逻辑。

Worker Thread Pools

 参考代码:

复制代码
class Handler implements Runnable {
    // uses util.concurrent thread pool
    static PooledExecutor pool = new PooledExecutor(...);
    static final int PROCESSING = 3;
    // ...
    synchronized void read() { // ...
        socket.read(input);
        if (inputIsComplete()) {
            state = PROCESSING;
            pool.execute(new Processer()); //使用线程pool异步执行
        }
    }
    
    synchronized void processAndHandOff() {
        process();
        state = SENDING; // or rebind attachment
        sk.interest(SelectionKey.OP_WRITE); //process完,开始等待write事件
    }
    
    class Processer implements Runnable {
        public void run() { processAndHandOff(); }
    }
}
复制代码

将处理器的执行放入线程池,多线程进行业务处理。但Reactor仍为单个线程。

继续改进:对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。

Using Multiple Reactors

参考代码:

复制代码
Selector[] selectors; //subReactors集合, 一个selector代表一个subReactor
int next = 0;
class Acceptor { // ...
    public synchronized void run() { ...
        Socket connection = serverSocket.accept(); //主selector负责accept
        if (connection != null)
            new Handler(selectors[next], connection); //选个subReactor去负责接收到的connection
        if (++next == selectors.length) next = 0;
    }
}
复制代码

mainReactor负责监听连接,accept连接给subReactor处理,为什么要单独分一个Reactor来处理监听呢?因为像TCP这样需要经过3次握手才能建立连接,这个建立连接的过程也是要耗时间和资源的,单独分一个Reactor来处理,可以提高性能。

 

参考:

http://www.cnblogs.com/fxjwind/p/3363329.html

 


eslint监测react代码 相关内容

2016-12-02 17:29:00 fang_sh_lianjia 阅读数 3321

reactor一行代码高性能并发编程

作者:链上研发-ouyang
时间:2016-12-02

响应式编程是一种面向数据流传播改变的编程范式。它是观察者模式、迭代器模式和函数式编程的一种最佳实践。它通过特别复杂的底层实践来简化开发的代码,所以才有可能进行一行代码的高性能并发编程。

假设有一个需求,有一组资源ID有n个,每个ID都可以用个网络请求获取到资源。需要获取所有的ID内容。

这里简单模拟下,看看并发代码:

    /**
     * 并行执行
     * @param concurrent 并行数量
     * @param sleeps 模拟停顿时间
     * @return 随便返回了
     */
    public Iterator<String> list(int concurrent, Long... sleeps){
        return Flux.fromArray(sleeps)
                .flatMap(sleep -> Mono.fromCallable( () -> mockHttp(sleep)).subscribeOn(Schedulers.elastic()), concurrent)
                .toIterable().iterator();
    }

这样就完成了一个执行n个并发的代码,仅仅需要一行。

依赖包

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.0.2.RELEASE</version>
</dependency>

完整的测试代码如下:

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class ReactorTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(ReactorTest.class);


    /**
     * 随便测试下
     */
    @Test
    public void concurrentTest(){

        //这里没有什么用,纯粹是Schedulers.elastic()可以复用这里的线程池,不想写多的代码了
        Flux.range(1,100).map(a -> a*1)
                .subscribeOn(Schedulers.elastic())
                .subscribe();

        //开始测试了
        long start = System.currentTimeMillis();


        //第一个参数20 20个并发
        //后面表示N个请求,最长的一个请求可能要2000ms
        list(20, 1000l,2000l,100l,200l,300l,400l,500l,600l,700l,800l,900l)
                .forEachRemaining( show ->  LOGGER.info(show) );

        LOGGER.info("总时间 : {} ms", System.currentTimeMillis() - start );

    }

    /**
     * 并行执行
     * @param concurrent 并行数量
     * @param sleeps 模拟停顿时间
     * @return 随便返回了
     */
    public Iterator<String> list(int concurrent, Long... sleeps){
        return Flux.fromArray(sleeps)
                .log()
                .flatMap(sleep -> Mono.fromCallable( () -> mockHttp(sleep)).subscribeOn(Schedulers.elastic()), concurrent)
                .toIterable().iterator();
    }

    /**
     * 实际上是一个http请求
     * @param sleep 请求耗时
     * @return
     */
    public String mockHttp(long sleep){
        try {
            Thread.sleep(sleep);
            LOGGER.info("停顿{}ms真的执行了", sleep);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return String.format("停顿了%sms", sleep);
    }

}

看看执行的结果吧:

17:10:59.663 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
17:10:59.748 [main] INFO reactor.Flux.Array.1 - | onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@614ddd49)
17:10:59.750 [main] INFO reactor.Flux.Array.1 - | request(20)
17:10:59.750 [main] INFO reactor.Flux.Array.1 - | onNext(1000)
17:10:59.811 [main] INFO reactor.Flux.Array.1 - | onNext(2000)
17:10:59.811 [main] INFO reactor.Flux.Array.1 - | onNext(100)
17:10:59.811 [main] INFO reactor.Flux.Array.1 - | onNext(200)
17:10:59.812 [main] INFO reactor.Flux.Array.1 - | onNext(300)
17:10:59.812 [main] INFO reactor.Flux.Array.1 - | onNext(400)
17:10:59.812 [main] INFO reactor.Flux.Array.1 - | onNext(500)
17:10:59.812 [main] INFO reactor.Flux.Array.1 - | onNext(600)
17:10:59.813 [main] INFO reactor.Flux.Array.1 - | onNext(700)
17:10:59.813 [main] INFO reactor.Flux.Array.1 - | onNext(800)
17:10:59.813 [main] INFO reactor.Flux.Array.1 - | onNext(900)
17:10:59.813 [main] INFO reactor.Flux.Array.1 - | onComplete()
17:10:59.915 [elastic-4] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿100ms真的执行了
17:10:59.916 [elastic-4] INFO reactor.Flux.Array.1 - | request(1)
17:10:59.918 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了100ms
17:11:00.016 [elastic-5] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿200ms真的执行了
17:11:00.016 [elastic-5] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.016 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了200ms
17:11:00.116 [elastic-6] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿300ms真的执行了
17:11:00.116 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了300ms
17:11:00.116 [elastic-6] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.217 [elastic-7] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿400ms真的执行了
17:11:00.218 [elastic-7] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.219 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了400ms
17:11:00.318 [elastic-8] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿500ms真的执行了
17:11:00.320 [elastic-8] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.320 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了500ms
17:11:00.417 [elastic-9] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿600ms真的执行了
17:11:00.418 [elastic-9] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.419 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了600ms
17:11:00.516 [elastic-10] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿700ms真的执行了
17:11:00.518 [elastic-10] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.523 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了700ms
17:11:00.614 [elastic-11] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿800ms真的执行了
17:11:00.615 [elastic-11] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.616 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了800ms
17:11:00.724 [elastic-12] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿900ms真的执行了
17:11:00.724 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了900ms
17:11:00.724 [elastic-12] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.815 [elastic-2] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿1000ms真的执行了
17:11:00.816 [elastic-2] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.816 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了1000ms
17:11:01.815 [elastic-3] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿2000ms真的执行了
17:11:01.835 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了2000ms
17:11:01.835 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 总时间 : 2116 ms

Process finished with exit code 0

eslint监测react代码 相关内容

2015-09-16 20:31:30 dasgk 阅读数 1079

这里,我想根据reactor模式,其原则是per thread per loop,也即一个线程一个循环

该模式组成:

      reactor:由reactor线程执行无限循环的监听活动,每当有connect到来的时候由reactor负责在线程池中找到一个空闲线程,同时将accept的返回值socket作为参数传递给空闲线程,同时reactor线程应该是单独的一个线程,也即该类应该是支持单例模式的。对于线程池的访问则是,通过组合的方式,获得线程池指针进行访问的。

    线程池,正如我上一篇博客中讲述的线程池的实现那样线程池中共有三个线程队列,一个是空闲进程,m_idle_list一个是忙碌线程表示正在工作的线程m_busy_list,最后一个则是m_stop_list,该队列的线程不参与工作的接收和分配工作。

   reactor.h的文件内容,如下:

#include<pthread.h>
class ServantPool;
class Reactor
{
private:	
	int m_sk;
	pthread_t m_id;	
	static void* reactor_listen(void* m);//这里面包含了其中循环监听和请求分发
	ServantPool* m_ser_pool;//from ServantPool chose the idle thread to process
	Reactor();		
	static Reactor* hInstance;	
public:			
	void SetPoolPtr(ServantPool* ptr);
	static Reactor* getInstance();
	void start();//开启监听线程
	void close_thread();
};

reactor.cpp文件内容如下:

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>

#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"Servant.h"
#include"reactor.h"
#define ACCEPTING 1
#define BUSY 2
#define STOP 2
#define PORT 2090
#define MAX_CONNECTIONS 100
Reactor* Reactor::hInstance=NULL;
Reactor* Reactor::getInstance()
{
	if(hInstance==NULL)
	{
		hInstance=new Reactor();
	}
	return hInstance;
}
void* Reactor::reactor_listen(void* m)
{
	//this static function begin listen
	Reactor* rec=(Reactor*)m;
	int serSocket=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
	if(serSocket<0)
	{
		printf("sorry socket error: %s\n",strerror(errno));
		return NULL;
	}
	struct sockaddr_in serAdd;
	memset(&serAdd,0,sizeof(sockaddr_in));
	serAdd.sin_family=AF_INET;
	serAdd.sin_addr.s_addr=htonl(INADDR_ANY);
	serAdd.sin_port=htons(PORT);	
	if(bind(serSocket,(struct sockaddr*)&serAdd,sizeof(serAdd))<0)
	{
		printf("sorry bind error: %s\n",strerror(errno));
	}
	if(listen(serSocket,MAX_CONNECTIONS)<0)
	{
		printf("sorry listen error: %s\n",strerror(errno));
	}
	else
	{
		printf("listen successfully: %s\n",strerror(errno));		
	}
	while(1)
	{
		Servant* cur;
		DZ:
		while(cur=rec->m_ser_pool->Accepting())
		{
			struct sockaddr_in tmpAddr;
			socklen_t size=sizeof(tmpAddr);
			int skt=accept(serSocket,(struct sockaddr*)&tmpAddr,&size);
			if(skt<0)
			{
				printf("accept error  %s\n",strerror(errno));
				return NULL;
			}
			//we begin to chose the idle,执行到此,说明已经接收到了一个请求,将请求套接字传递给线程池中空闲线程的第一个线程
			cur->setSocket(skt);
			//tell the thread,you could do something!!,通知该线程有事件发生,可以激活,进行工作了
			cur->notify();
		}
                //执行到此,说明当前已经没有了空闲线程,然后等待空闲线程的出现
                while((cur=rec->m_ser_pool->Accepting())==NULL)
		{
			if(errno!=0)
				return NULL;
		}
		goto DZ;
	}
	return NULL;
}

Reactor::Reactor()
{
	;
}
void Reactor::SetPoolPtr(ServantPool* ptr)
{
	m_ser_pool=ptr;
}
void Reactor::start()
{
	printf("reactor successfully\n");
        //开启监听线程,也即只有一个监听线程。
        int result=pthread_create(&m_id,NULL,reactor_listen,this);
	if(result==-1)
	{
		printf("the Reactor thread created error \n",strerror(errno));
		return ;
	}
	else
	{
		printf("the Reactor pthread_create success \n");	
	}	
}
void Reactor::close_thread()
{
	m_ser_pool->StopAll();
}

Servant.h文件,其中包含了工作线程,和线程池的定义

#include<pthread.h>
#include<vector>
using namespace std;
class Reactor;
class ServantPool;
class Servant
{
	private:

		pthread_mutex_t mutex_x;
		pthread_cond_t cond_t;		
		int m_socket;	
		Reactor* m_reactor;
		ServantPool* m_pool;
		int thread_id;
	public:
		int state;
		pthread_t m_tid;
		Servant(ServantPool* pol);
		void start();
	static void* ThreadProc(void* data);
		void notify();
		void stop();
		void wakeup();
		void setSocket(int m);
		void exit();
		void join();
		void Complete();
		void DoJob();
		
};

//ServantPool supposed to be singlton
class ServantPool
{
private:
	//running thread but not really work
	vector<Servant*> m_idle_list;
	//running thread and really work
	vector<Servant*> m_busy_list;
	//stopped thread 
	vector<Servant*> m_stop_list;
	ServantPool(){};
	static ServantPool * hInstance;
public:	
	void StopAll();
	void create(int num);
	void startAll();
	void AddtoIdle(bool add,Servant* ser);
	void Addtobusy(bool add,Servant* ser);
	void Addtostop(bool add,Servant* ser);
	void stop(Servant* id);
	void wakeup(Servant* id);
	void waitforAll();
	~ServantPool();	
	static ServantPool * getInstance();	
	Servant* Accepting();
};

Servant.cpp中工作线程的各项定义,以及个线程池的定义

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<unistd.h>
#include"Servant.h"
#define WORKING 1
#define STOP 2
#define EXIT 3
#define IDLE 4
static int num=0;
static 	pthread_mutex_t vec_mutex; 
void Servant::start()
{
	state=IDLE;
	int result=pthread_create(&m_tid,NULL,ThreadProc,this);
	if(0!=result)
	{
		printf("thread create result :%s\n",strerror(errno));
	}
}
Servant::Servant(ServantPool* ma)
{	
	thread_id=num++;
	m_pool=ma;	
	pthread_mutex_init(&mutex_x,NULL);
	pthread_cond_init(&cond_t,NULL);	
}
void Servant::setSocket(int m)
{
	m_socket=m;
}
void Servant::DoJob()
{
	char buf[100]={0};
	recv(m_socket,buf,100,0);
	printf("we haved the %d recv:%s\n",num++,buf);
	close(m_socket);
}
void* Servant::ThreadProc(void* data)
{	
	Servant* ser=(Servant*)data;
	int result=0;
	while(ser->state!=EXIT)
	{
		while(ser->state==IDLE)
		{
			result=pthread_mutex_lock(&(ser->mutex_x));
			if(0==result)
			{
				printf("waiting for the mutex \n");
			}			
			result=pthread_cond_wait(&(ser->cond_t),&(ser->mutex_x));	
			if(ser->state!=IDLE)
				goto End;
				
			printf("the conditions has been notified\n");
			//we take this thread to busy list
			ser->m_pool->AddtoIdle(false,ser);
			ser->m_pool->Addtobusy(true,ser);
			// really   work
			
			DoSomething:
			ser->state=WORKING;			
			printf("Do Something...\n");
			ser->DoJob();
			ser->Complete();//this function change state
			End:
			pthread_mutex_unlock(&(ser->mutex_x));					
		}
	}
	return NULL;
}
void Servant::stop()
{
	if(state==IDLE)		
	{
		m_pool->AddtoIdle(false,this);
		m_pool->Addtostop(true,this);
		state=STOP;
		printf("thread stop!\n");		
	}
	else if(state==WORKING)
	{
		printf("current state is WORKING stop failed!\n");				
	}
	else if(state==STOP)
	{
		printf("thread already stopped!\n");				
	}
	else 
	{
		printf("sorry unknown state!\n");
		state=STOP;
	}
}
void Servant::wakeup()
{
	if(state==STOP)		
	{
		m_pool->Addtostop(false,this);
		m_pool->AddtoIdle(true,this);
		state=IDLE;	
		printf("thread wakeup!\n");			
	}
	else if(state==WORKING)
	{
		printf("current state  is WORKING stop failed!\n");
	}
	else if(state==IDLE)
	{
		printf("current state  is idle never need wakeup!\n");
	}
	else
	{
		printf("sorry unknown state..\n");
		state=IDLE;
	}
}
void Servant::Complete()//完成操作
{
	//完成任务,该线程变为idle		
	m_pool->Addtobusy(false,this);
	m_pool->AddtoIdle(true,this);	
	state=IDLE;
}
void Servant::join()
{
	pthread_join(m_tid,NULL);
}
void Servant::notify()
{
	if(state==IDLE)
	{		
		printf("we have notified thread running\n");
		pthread_cond_signal(&cond_t);	
	}
	else
	{
		printf("sorry ,the signal is not correct\n");
	}
}
void Servant::exit()
{
	state=EXIT;
	pthread_cond_signal(&cond_t);	
}




void ServantPool::StopAll()
{	
	vector<Servant*>::iterator itr=m_idle_list.begin();	
	for(;itr!=m_idle_list.end();)
	{
		(*itr)->stop();		
	}
	itr=m_busy_list.begin();
	for(;itr!=m_busy_list.end();)
	{
		(*itr)->stop();		
	}	
}
void ServantPool::create(int num)
{
	int i=0;
	for(;i<num;i++)
	{
		Servant* tmp=new Servant(this);
		m_idle_list.push_back(tmp);
	}
}
void ServantPool::AddtoIdle(bool add,Servant* ser)
{
	if(add)
	{
	    // add ser to idle list
		pthread_mutex_lock(&vec_mutex);
		m_idle_list.push_back(ser);
		pthread_mutex_unlock(&vec_mutex);
	}
	else
	{
		// del ser from idle list
		pthread_mutex_lock(&vec_mutex);
		vector<Servant*>::iterator itr=m_idle_list.begin();
		for(;itr!=m_idle_list.end();itr++)
		{
			if(*itr==ser)
			{
				m_idle_list.erase(itr);
				break;
			}
		}
		pthread_mutex_unlock(&vec_mutex);
	}
}
void ServantPool::Addtobusy(bool add,Servant* ser)
{
	if(add)
	{
	    // add ser to idle list
		pthread_mutex_lock(&vec_mutex);
		m_busy_list.push_back(ser);
		pthread_mutex_unlock(&vec_mutex);
	}
	else
	{
		// del ser from idle list
		pthread_mutex_lock(&vec_mutex);
		vector<Servant*>::iterator itr=m_busy_list.begin();
		for(;itr!=m_busy_list.end();itr++)
		{
			if(*itr==ser)
			{
				m_busy_list.erase(itr);
				break;
			}
		}
		pthread_mutex_unlock(&vec_mutex);
	}
}
void ServantPool::Addtostop(bool add,Servant* ser)
{
	if(add)
	{
	    // add ser to idle list
		pthread_mutex_lock(&vec_mutex);
		m_stop_list.push_back(ser);
		pthread_mutex_unlock(&vec_mutex);
	}
	else
	{
		// del ser from idle list
		pthread_mutex_lock(&vec_mutex);
		vector<Servant*>::iterator itr=m_stop_list.begin();
		for(;itr!=m_stop_list.end();itr++)
		{
			if(*itr==ser)
			{
				m_stop_list.erase(itr);
				break;
			}
		}
		pthread_mutex_unlock(&vec_mutex);
	}
}
void ServantPool::startAll()
{
	int i=0;
	for(;i<m_idle_list.size();i++)
	{
		Servant* tmp=m_idle_list[i];
		printf("start the thread %d\n",i);
		tmp->start();//create the thread
	}
}

void ServantPool::stop(Servant* id)
{	
	vector<Servant*>::iterator itr=m_idle_list.begin();	
	for(;itr!=m_idle_list.end();itr++)
	{
		if((*itr)==id)
		{	
			(*itr)->stop();			
			return;
		}
	}
}

void ServantPool::waitforAll()
{
	int i=0;
	int nums=m_busy_list.size();
	for(;i<nums;i++)
	{			
		Servant* tmp=m_busy_list[i];
		tmp->join();
	}
	nums=m_idle_list.size();
	i=0;
	for(;i<nums;i++)
	{			
		Servant* tmp=m_idle_list[i];
		tmp->join();
	}	
}

void ServantPool::wakeup(Servant* id)
{	
	vector<Servant*>::iterator itr=m_busy_list.begin();	
	for(;itr!=m_busy_list.end();itr++)
	{
		if((*itr)==id)
		{	
			(*itr)->wakeup();			
			return;
		}
	}
}
ServantPool * ServantPool::hInstance=NULL;
ServantPool * ServantPool::getInstance()
{	
	if(NULL==hInstance)
	{
		hInstance=new ServantPool();
		pthread_mutex_init(&vec_mutex,NULL);
	}
	return hInstance;		
}
ServantPool::~ServantPool()
{
	vector<Servant*>::iterator itr=m_idle_list.begin();
	for(;itr!=m_idle_list.end();)
	{		
		(*itr)->exit();				
		delete *itr;		
		itr=m_idle_list.erase(itr);		
	}
	
	itr=m_busy_list.begin();
	for(;itr!=m_busy_list.end();)
	{		
		(*itr)->exit();				
		delete *itr;		
		itr=m_busy_list.erase(itr);		
	}	
	
	itr=m_stop_list.begin();
	for(;itr!=m_stop_list.end();)
	{		
		(*itr)->exit();				
		delete *itr;		
		itr=m_stop_list.erase(itr);		
	}	
}
Servant* ServantPool::Accepting()
{
	if(m_idle_list.size()>0)
	{
		return m_idle_list[0];
	}
	return NULL;
}
接下来就是makefile文件内容了:

test:Servant.o reactor.o main.o
	g++ -o   test Servant.o reactor.o main.o -lpthread
Servant.o:Servant.cpp Servant.h
	g++ -g -c  Servant.cpp -lpthread
reactor.o:reactor.cpp reactor.h
	g++ -g -c  reactor.cpp -lpthread
main.o:main.c Servant.h
	g++ -g -c  main.c -lpthread
clean:
	rm *.o test


这里面需要注意的就是一点,对于m_idle_list    m_busy_list   m_stop_list三个队列的操作,都应该是原子操作,该操作的原子性保证是,通过静态全局互斥变量来保证的。

这样就实现了reactor模式,也即反应器模式,代码下载地址:

eslint监测react代码 相关内容

2008-07-08 09:42:00 happyhell 阅读数 4264

服务端:
功能:保存所有客户端信息和在线状态,统一分配端口.对掉线的客户端信息通知到进程控制模块
ServerService.h

 

#ifndef _SERVERSERVICE_H_
#define _SERVERSERVICE_H_
#include <map>
#include <string>
#include <sstream>
#include <fstream>
#include "ace/TP_Reactor.h"
#include "ace/SOCK_Dgram.h"
#include "ace/Task.h"
namespace YB
{
const static ACE_UINT32 iLocalHostPort = 8001;
typedef unsigned  char BYTE;
static ACE_Thread_Mutex slock;
typedef struct STAPPID_
{
std::string  sIp;               // 客户端IP
int    iPort;               // 客户端连接到服务端的端口
int    ClientAcceptPort;            // 服务端分配的客户端监听端口
BYTE   byAppid;              // 模块标识
BYTE         byGroup;              // 组号
int          iTime;               // 时计器,维护客户端在线状态
bool   bOnline;              // 在线状态
}STAppid;
/*
服务端UDP数据收发
*/
class CMain;
class CServerService: public ACE_Event_Handler
{
public:
// 构造、析构函数
CServerService();
~CServerService();
virtual ACE_HANDLE get_handle(void) const;         // 继承基类
virtual int  handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);    // 有网络数据到达
virtual int  handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask); // 关闭网络监听

bool Open(CMain* cm);
void SendToAll(char* buf, int nSize);         // 消息群发
int  DeleteAppid(BYTE byappid, std::string sip);       // 删除指定客户端信息
bool ReadServerIP();              // 读配置IP
private:
void UpdateClientAllSatte(STAppid stAppid, ACE_INET_Addr taddr);   // 更新客户端状态
void UpdateState(YB::BYTE byappid);          // 客户端在线
void MsgData(YB::BYTE byappid);           // 消息报文
void ChackProtocol(const char* buf, const int nSize);     // 解析报文
void ApplyConn(const char *buf, const int nSize);      // 应答客户申请连接
void AllotPort(unsigned short &uiPort);         // 检查分配客户端端口是否重复,并分配新的
BYTE checkFrame(unsigned char* uc_buf, unsigned short uc_length);  // 帧校验
void fixFrame(unsigned char* uc_buf, unsigned char& uc_Length,   // 组帧
      unsigned char flag, unsigned char dest_addr, unsigned char src_addr);
    void    CheckAppid(BYTE byappid,std::string sIp);
public:
std::map<BYTE, STAppid> mpInfo;            // 注册客户端信息表
private:
ACE_INET_Addr addr;
ACE_SOCK_Dgram udp;
std::string  sServerIP;
unsigned short usiPort;             // 分配客户端端口值
STAppid   stAppid;
int    iPortCount;
CMain*   cmn;
};

// 定时器类,监视在线客户端,
class CTaskTimer : public ACE_Task_Base
{
public:
// 构造、析构函数
CTaskTimer(){timeid = 0;};
virtual ~CTaskTimer(){};
virtual int  handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask); // 关闭网络监听
public:
bool Open(CServerService *udp);
int  handle_timeout(const ACE_Time_Value &current_time, const void *act = 0);// 定时器,清理掉线客户端

private:
CServerService *sudp;
long   timeid;
};
// 主调类,负责启动定时器和网络监听
class CMain : public ACE_Task_Base
{
public:
// 构造、析构函数
CMain(){};
~CMain(){};
public:
bool Open();
int  Close();
private:
CServerService *serudp;
CTaskTimer  *taskTimer;
};
}
#endif  // end of _SERVERSERVICE_H_

 

 

ServerService.cpp
#include "./ServerService.h"
using namespace YB;
bool CMain::Open()
{
ACE_NEW_RETURN(serudp, CServerService, false);
ACE_NEW_RETURN(taskTimer, CTaskTimer, false);
serudp->reactor(ACE_Reactor::instance());
if (!serudp->Open(this)) return false;
taskTimer->reactor(ACE_Reactor::instance());
if (!taskTimer->Open(serudp)) return false;
ACE_OS::sleep(ACE_Time_Value(0, 10000));   // 等待10毫秒   
ACE_Reactor::instance()->run_reactor_event_loop(); // 启动线程
return true;
}
int CMain::Close()
{
taskTimer->handle_close(ACE_INVALID_HANDLE, 0);
serudp->handle_close(ACE_INVALID_HANDLE, 0);
if (ACE_Reactor::instance()->reactor_event_loop_done() != 1)
{
  ACE_Reactor::instance()->end_reactor_event_loop();
}
return 0;
}  
///////////////////////////////////////////
CServerService::CServerService()
{
usiPort = 20000;   // 初使分配客户端端口值
iPortCount = 0;
}
CServerService::~CServerService()
{
}
bool CServerService::ReadServerIP()
{
std::ifstream fle("ServerIp.txt", std::ios::in);
if (!fle) return false;

std::ostringstream seamServerIP;
seamServerIP<<fle.rdbuf();
sServerIP = seamServerIP.str();
fle.close();
return true;
}
bool CServerService::Open(CMain* cm)
{
if (!ReadServerIP()) return false;
this->addr.set(iLocalHostPort, sServerIP.c_str());//, ACE_LOCALHOST
this->udp.open(addr);
this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);
cmn = cm;
return true;
}
ACE_HANDLE CServerService::get_handle() const
{
return this->udp.get_handle();
}
int  CServerService::handle_input(ACE_HANDLE)
{
ACE_INET_Addr taddr;
char   buf[255] = {0};
int    isize = 0;
isize = this->udp.recv(buf, 255, taddr);
stAppid.iPort = taddr.get_port_number();
stAppid.sIp = taddr.get_host_addr();

if (isize > 0 && isize < 255) ChackProtocol(buf, isize);
return 0;
}
int  CServerService::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
if (this->udp.get_handle() != ACE_INVALID_HANDLE)
{
  ACE_Reactor_Mask m = ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;
  this->reactor()->remove_handler(this, m);
  this->udp.close();  
}
delete this;
return 0;
}
unsigned char CServerService::checkFrame(unsigned char* uc_buf, unsigned short uc_length)            
{
//检查报文
return 1;
}
void CServerService::fixFrame( unsigned char* uc_buf,
        unsigned char& uc_Length,
        unsigned char flag,
        unsigned char dest_addr,
        unsigned char src_addr)
{
//组装报文
return ;
}
void CServerService::ChackProtocol(const char* buf, const int nSize)
{
YB::BYTE *p = (YB::BYTE*)buf;

if (checkFrame(p, nSize)) return;
switch (*(p + 11))
{
  case 0x00:  // 心跳
  {
   
   UpdateState(*(p + 6));
   if (*(p + 2) == 0x02) MsgData(*(p + 6));
   break;
   
  }   
  case 0x01:  // 我要处理的类型
  {
   switch (*(p + 15))
   {
    case 0x00:  // 正常退出,离线状态
    {
     DeleteAppid(*(p + 23), stAppid.sIp);
     break;
    }
    case 0x02: // 申请连接
    {
     ApplyConn(buf, nSize);
     break;
    }
    default:
     break;
   }
   break;
  }
  case 0x02: // 退出
  {
   if (*(p + 15) == 0x04 && *(p + 6) == 0x01) cmn->Close();
   break;
  }
  default:
   break;  
}
}
void CServerService::ApplyConn(const char *buf, const int nSize)
{

ACE_INET_Addr taddr;
YB::BYTE isize = 0x0C;
char  puf[255] = {0};
char *p = (char*)buf;  
AllotPort(usiPort);
stAppid.ClientAcceptPort = usiPort;
stAppid.byAppid = *(p + 6);
stAppid.byGroup = *(p + 16);
CheckAppid( stAppid.byAppid,stAppid.sIp.c_str() );
taddr.set(usiPort, stAppid.sIp.c_str());
ACE_UINT32 ip = taddr.get_ip_address();
u_short  iprt = taddr.get_port_number();
/*组帧
strcpy(puf, "/x01/x01/x01/x0C/x03");
puf[5] = stAppid.byGroup;
memcpy(puf + 6, &ip, sizeof(ACE_UINT32));
memcpy(puf + 6 + sizeof(ACE_UINT32), &iprt, sizeof(u_short));
fixFrame((unsigned char*)puf, isize, 0x01, stAppid.byAppid, 0x04);
taddr.set(stAppid.iPort, stAppid.sIp.c_str());
*/
this->udp.send(puf, isize, taddr);
/*
// 向其他客户端更新信息
isize = 0x0D;
memset(puf, 0x00, 255);
strcpy(puf, "/x01/x01/x01/x0D/x01");
puf[5] = stAppid.byGroup;
memcpy(puf + 6, &ip, sizeof(ACE_UINT32));
memcpy(puf + 6 + sizeof(ACE_UINT32), &iprt, sizeof(u_short));
memcpy(puf + 6 + sizeof(ACE_UINT32) + sizeof(u_short), &stAppid.byAppid, sizeof(YB::BYTE));
fixFrame((unsigned char*)puf, isize, 0x01, 0x00, 0x04);
*/
SendToAll(puf, isize);
// 向新增加客户更新信息
UpdateClientAllSatte(stAppid, taddr);
// 增加新的客户端到链表
stAppid.iTime = 1;
stAppid.bOnline = true;
slock.acquire();
mpInfo.insert(std::make_pair(stAppid.byAppid, stAppid));
slock.release();
}
void CServerService::CheckAppid(YB::BYTE byappid,std::string sIp)
{
std::map<YB::BYTE, STAppid>::iterator mpIter;
for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)   
{
  if (byappid == mpIter->second.byAppid &&
   sIp == mpIter->second.sIp )
  {
   DeleteAppid(byappid,sIp);
   ACE_OS::sleep(1);
  }   
}
}
void CServerService::AllotPort(unsigned short &uiPort)
{
if (uiPort > 65500)
{
  uiPort = 20000;
  iPortCount++;
}
if (iPortCount < 1)
{
  uiPort++; //增加分配的端口号
}
else
{
  std::map<YB::BYTE, STAppid>::iterator mpIter;
  for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)   
  {
   if (uiPort == mpIter->second.ClientAcceptPort)
   {
    uiPort++;
    mpIter = mpInfo.begin();
   }   
  }
}
}
int CServerService::DeleteAppid(YB::BYTE byappid, std::string sip)
{
std::map<YB::BYTE, STAppid>::iterator mpIter;
YB::BYTE isize = 0x0D;
bool  b_isfind = false;
char  puf[255] = {0};
slock.acquire();
for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++)   
{
  if (mpIter->first != byappid) break;
  if (mpIter->second.sIp != sip) continue;
  
  ACE_INET_Addr taddr(mpIter->second.ClientAcceptPort, sip.c_str());
  ACE_UINT32  ip = taddr.get_ip_address();
  u_short   iprt = taddr.get_port_number();
  /*组帧
  memset(puf, 0, 255);
  isize = 0x0D;
  strcpy(puf, "/x01/x01/x01/x0D");
  puf[5] = mpIter->second.byGroup;
  memcpy(puf + 6, &ip, sizeof(ACE_UINT32));
  memcpy(puf + 6 + sizeof(ACE_UINT32), &iprt, sizeof(u_short));
  memcpy(puf + 6 + sizeof(ACE_UINT32) + sizeof(u_short), &mpIter->second.byAppid, sizeof(YB::BYTE));
  fixFrame((unsigned char*)puf, isize, 0x01, 0x00, 0x04);
*/
  mpInfo.erase(mpIter);
  b_isfind = true;
  break;
}
slock.release();
// 广播到各客户端
if (b_isfind) SendToAll(puf, isize);
return 0;
}
void CServerService::SendToAll(char* buf, int nSize)
{
std::map<YB::BYTE, STAppid>::iterator mpIter;
ACE_INET_Addr taddr;
for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)   
{
  taddr.set(mpIter->second.iPort, mpIter->second.sIp.c_str());
  int bychar = (unsigned char)buf[10] - mpIter->second.byAppid;
  buf[10] = mpIter->second.byAppid;
  buf[nSize - 2] -= bychar;
  this->udp.send(buf, nSize, taddr);
}
}
void CServerService::UpdateState(YB::BYTE byappid)  // 客户端在线
{
std::map<YB::BYTE, STAppid>:: iterator mpIter;

for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++)
{
  if (mpIter->first != byappid) break;
  if (stAppid.sIp != mpIter->second.sIp) continue;
  mpIter->second.iTime = 1;
  break;
}
}
void CServerService::MsgData(YB::BYTE byappid)
{
std::map<YB::BYTE, STAppid>::iterator mpIter;
for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++)  
{
  if (mpIter->first != byappid) break;
  if (mpIter->second.sIp != stAppid.sIp) continue;
  ACE_INET_Addr taddr(stAppid.iPort, stAppid.sIp.c_str());
  char   puf[255] = {0};
  unsigned char iszie = 0x04;
  strcpy(puf, "/xf0/x01/x01/x04");
  fixFrame((unsigned char*)puf, iszie, 0x01, byappid, 0x04);
  this->udp.send(puf, iszie, taddr);
  break;
}
}
void CServerService::UpdateClientAllSatte(STAppid stAppid, ACE_INET_Addr taddr)
{
std::map<YB::BYTE, STAppid>::iterator mpIter;
ACE_INET_Addr taddr1;
unsigned char isize = 0x0D;
ACE_UINT32  ip = 0;
u_short   iprt = 0;
char   puf[255] = {0};
  
ACE_Time_Value t(0, 100000);
ACE_OS::sleep(t);
for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)  
{
  taddr1.set(mpIter->second.ClientAcceptPort, mpIter->second.sIp.c_str());  
  ip = taddr1.get_ip_address();
  iprt = taddr1.get_port_number();
  /*
  memset(puf, 0, 255);
  isize = 0x0D;
  strcpy(puf, "/x01/x01/x01/x0D/x01");
  puf[5] = mpIter->second.byGroup;
  memcpy(puf + 6, &ip, sizeof(ACE_UINT32));
  memcpy(puf + 6 + sizeof(ACE_UINT32), &iprt, sizeof(u_short));
  memcpy(puf + 6 + sizeof(ACE_UINT32) + sizeof(u_short), &mpIter->second.byAppid, sizeof(YB::BYTE));
  fixFrame((unsigned char*)puf, isize, 0x01, stAppid.byAppid, 0x04);
  */
  this->udp.send(puf, isize, taddr);
  t.set(0, 10000);
  ACE_OS::sleep(t);
}
}
//////////////////////////////////////////////
///*
bool  CTaskTimer::Open(CServerService *udp)
{
sudp = udp;
ACE_Time_Value idlay(1);
ACE_Time_Value ival(60);
timeid = this->reactor()->schedule_timer(this, 0, idlay, ival);
return true;
}
int CTaskTimer::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
if (timeid) this->reactor()->cancel_timer(this);
delete this;
return 0;
}
int CTaskTimer::handle_timeout(const ACE_Time_Value &current_time, const void *act)
{
std::map<YB::BYTE, STAppid>::iterator mpIter;
for (mpIter = sudp->mpInfo.begin(); mpIter != sudp->mpInfo.end(); mpIter++)   
{
  mpIter->second.bOnline = mpIter->second.iTime ? true : false;
  mpIter->second.iTime = 0;
}
// 删除掉线客户端
for (mpIter = sudp->mpInfo.begin(); mpIter != sudp->mpInfo.end(); mpIter++)   
{
  if (!mpIter->second.bOnline)
  {
   sudp->DeleteAppid(mpIter->second.byAppid, mpIter->second.sIp);
   mpIter = sudp->mpInfo.begin();
   if (mpIter == sudp->mpInfo.end()) break;
  }
}
return 0;
}

 

执行程序

main.cpp
#include "./ServerService.h"
int ACE_TMAIN(int argc, char* argv[])
{
YB::CMain cmain;
if (!cmain.Open()) cmain.Close();
return 0;
}

 

 

客户端DLL
功能:根据模块号与其他客户进行连接.数据转发
DLL.cpp

 

#include "./NetWork.h"
#include "./os_fun.h"
class _mydllexport CBaseInNetwork
{
public:
CBaseInNetwork();
virtual ~CBaseInNetwork();
virtual bool InitNetwork(unsigned  char byAppid, int igroup ,int itype);
virtual void CloseNetwork();
virtual int SendData(const unsigned char* buf, const int nSize, unsigned  char byAppid );
virtual int GetDataLength();
virtual int GetData(unsigned char* buf);
private:

CNetWork *pnet;
};
CBaseInNetwork::CBaseInNetwork()
{
}
CBaseInNetwork::~CBaseInNetwork()
{
}
bool CBaseInNetwork::InitNetwork( unsigned  char byAppid,  int igroup ,int itype)
{
ACE::init();
bool bRetVal = false;
pnet =  new CNetWork();
bRetVal = pnet->Open( byAppid, igroup , itype );
return bRetVal;
}
int CBaseInNetwork::SendData(const unsigned char* buf, const int nSize, unsigned  char byAppid )
{
return pnet->SendData( buf, nSize, byAppid );
}
int CBaseInNetwork::GetDataLength()
{
return pnet->GetDataLength();
}
int CBaseInNetwork::GetData(unsigned char* buf)
{
return pnet->GetData( buf );
}
void CBaseInNetwork::CloseNetwork( )
{
pnet->Close();
delete pnet;
pnet = NULL;
ACE::fini();
}

 

os_fun.h
#ifndef _OS_FUN_H_
#define _OS_FUN_H_
#ifdef WIN32
#include "./win32_fun.hpp"
#endif
#ifdef linux
#include "./linux_fun.hpp"
#endif
#endif // end of _OS_FUN_H_

 

win32_fun.hpp
#ifndef __WIN32_FUN_H
#define __WIN32_FUN_H
#include <windows.h>
#define _mydllexport extern "C" _declspec(dllexport)
BOOL WINAPI DllMain(HANDLE hModule,
     DWORD  ul_reason_for_call,
     LPVOID lpReserved
     )
{
switch (ul_reason_for_call)
{
case DLL_PROCESS_ATTACH:
case DLL_THREAD_ATTACH:
case DLL_THREAD_DETACH:
case DLL_PROCESS_DETACH:
  break;
}
    return TRUE;
}
#endif // end of __WIN32_FUN_H

 

 

linux_fun.hpp
#ifndef __LINUX_FUN_H
#define __LINUX_FUN_H
#define _mydllexport  
#endif // end of __LINUX_FUN_H

 

BaseNetWork.h

#ifndef _BASENETWORK_H_
#define _BASENETWORK_H_
namespace YB
{
class CBaseNetWork
{
protected:
CBaseNetWork(void){};
public:
virtual ~CBaseNetWork(void){};
virtual bool Open(unsigned  char byAppid, int igroup = 0, int itype = 0) = 0; //strAppid自己
virtual int SendData(const unsigned char* buf, const int nSize, unsigned  char byAppid = 0xff) = 0;
virtual int GetData(unsigned char* buf) = 0;
virtual int GetDataLength() = 0;
virtual void Close() = 0;

};
}
#endif // end of _BASENETWORK_H_

 

 

#ifndef _NETWORK_H_
#define _NETWORK_H_
#include <map>
#include <list>
#include <string>
#include <fstream>
#include <sstream>
#include "ace/TP_Reactor.h"
#include "ace/SOCK_Dgram.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Connector.h"
#include "ace/Task.h"
const static ACE_UINT32 iServerPort = 8001;
typedef unsigned  char Byte;
static ACE_Thread_Mutex mlocka;
static ACE_Thread_Mutex mlock_mp;
const static Byte ServerAppid = 0x04;
class CNetWork;
/*
与服务端建立连接,收发信息
*/
class CServerUdpcs : public ACE_Event_Handler
{
public:
// 构造、析构函数
CServerUdpcs(){};
~CServerUdpcs(){};
virtual ACE_HANDLE get_handle()const;
virtual int  handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);    // 网络读事件
virtual int  handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask); // 关闭网络事件
public:
int  Open_Event();          // 注册事件
bool Open(Byte byappid, Byte bygroup);
int  SendData(const char* buf, const int nSize);
int  GetData(char* buf, const int nSize);
void SetParentHander(CNetWork *p);      // 设置主类指针,引用数据连表

public:
int  iClientStateSign;         // 客户状态变化标志;1有变化,0变化
private:
void ChackProtocol(const char* buf, const int nSize); // 解析报文
void UpdateMapInfo(const char* buf, const int nSize); // 更新客户端状态
void ReadDataToList(const char* buf, const int nSize); // 保存数据到连表
void SaveMapinfo(const char* buf);      // 保存服务器发送的其他客户端信息
void SaveLocalhost(const char* buf);      // 保存本机IP和服务器分配的端口
int  GetSourcePort(const char* buf);      // 得到端口
std::string GetSourceIp(const char* buf);     // 得到IP地址
private:
ACE_INET_Addr praddr;
ACE_SOCK_Dgram udp; //UDP协议流
CNetWork  *net;
};
/*
客户端建立监听服务
*/
class CClientAcceptor : public ACE_Event_Handler
{
public:
// 构造、析构函数
CClientAcceptor(){};
~CClientAcceptor(){};
virtual ACE_HANDLE get_handle()const;
virtual int  handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);    // 接受客户端连接
virtual int  handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask); // 关闭网络事件
public:
int  open(void* avg = 0);
void SetParentHander(CNetWork *p);
private:
ACE_SOCK_Acceptor acp;
CNetWork*   net;
};
class CClientService;
// 保存在线客户端连接
typedef struct STAPPIDCS
{
std::string  sIp;
int    iPort;
Byte   byAppid;
Byte         byGroup; // 组号
CClientService *pcs; // 客户端连接
}STAppidcs;
// 保存其他客户发送到本站的数据
typedef struct STLISTDATA
{
std::string  sAppid;
int    iLength;
Byte   *byData;
}STListData;
/*
点对点数据收发
*/
class CClientService : public ACE_Event_Handler
{
public:
// 构造、析构函数
CClientService(void){};
~CClientService(){};
// 继承基类
virtual ACE_HANDLE get_handle()const;
virtual int  handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);   // 接受数据,保存到连表
virtual int  handle_close(ACE_HANDLE handle, ACE_Reactor_Mask mask); // 退出连接,删除资源
public:
int  Open();
void SetParentHander(CNetWork *p);
int  connect(ACE_INET_Addr addr);         // 连接到其他客户端
int  SendData(const char* buf, const int nSize);      // 发送数据到其他客户端
ACE_SOCK_Stream &peer(){return sockeam;}

private:
void AddAppid(STAppidcs STAppidcs);         // 保存客户端连接
void DeleteAppid(CClientService *cs);        // 删除客户端连接
void ReadDataToList(const char* buf, const int nSize);    // 保存数据到连表
void ChackProtocol(const char* buf, const int nSize);    // 解析报文
private:
ACE_SOCK_Connector con;           
ACE_SOCK_Stream  sockeam;
STAppidcs   stAppidcs;           // 保存最近一个客户端连接信息
CNetWork   *net;
};
class CTaskTimer;
/*
用户服务类,数据保存
*/
class CNetWork:public ACE_Task_Base
{
public:
// 构造、析构函数
CNetWork(void);
virtual ~CNetWork(void);
public:
bool Open(Byte byappid, int igroup = 0, int itype = 0);    // 服务器地址;strAppid自己
int  SendData(const Byte* buf, const int nSize, Byte byappid = 0xff);
int  GetDataLength();            //返回数据长度
int  GetData(Byte* buf);            //返回数据
void Close();
bool ReadServerIP();             //取系统IP
int  svc(void);              // 线程回调函数

public:
std::map<Byte, STAppidcs> mpInfo;          // 客户端信息连表
  std::list<STListData>  lstData;         // 数据连表
int  iprtype;              // 是否主控模块标识
Byte byAppid;              // 本机APPID
int  iport;               // 服务端分配PORT,
std::string sip;              // 本机IP
Byte byGroup;              // 组号
STAppidcs  stAppidcs;            // 保存最近一个服务端返回其他客户信息CServerUdpcs.SaveMapinfo使用
CServerUdpcs *pServerUdp;
CClientAcceptor *pCAcptor;
CClientService  *pCService;
CTaskTimer  *ptimer;
std::string  sServerIP;
std::string  sLocalIP;
bool   b_run;
private:
int GroupSend(const char* buf, const int nSize);      // 组群发
int SingleSend(const char* buf, const int nSize, Byte byappid);   // 单发

};
// 定时器类
class CTaskTimer : public ACE_Task_Base
{
public:
// 构造、析构函数
CTaskTimer(){timeid = 0;};
virtual ~CTaskTimer(){};
virtual int  handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask); // 关闭网络监听
public:
bool Open(CNetWork *p);
int  handle_timeout(const ACE_Time_Value &current_time, const void *act = 0);// 定时器,发送心跳报文

private:
CNetWork  *pnet;
long   timeid;
};

#endif // end of _NETWORK_H_

 

NetWork.cpp

#include "./NetWork.h"

unsigned char checkFrame(unsigned char* uc_buf, unsigned short uc_length)            
{

return 0;
}
void fixFrame(unsigned char* uc_buf,
    unsigned char& uc_Length,
    unsigned char flag,
    unsigned char dest_addr,
    unsigned char src_addr)
{
;
}
///////////////////////////////////////////
///*
CNetWork::CNetWork()
{
}
CNetWork::~CNetWork()
{
}
bool CNetWork::Open(Byte byappid, int igroup, int itype)
{
b_run = false;
iprtype = itype;
byAppid = byappid;
byGroup = (Byte)igroup;
ACE_NEW_RETURN(pServerUdp, CServerUdpcs, false);
ACE_NEW_RETURN(pCAcptor, CClientAcceptor, false);
ACE_NEW_RETURN(ptimer, CTaskTimer, false);
if (!ReadServerIP()) return false;
// 向服务端申请连接,登陆到服务端
pServerUdp->SetParentHander(this);
if (!pServerUdp->Open(byAppid, byGroup)) return false;

// 自监听
pCAcptor->SetParentHander(this);
// 开启线程
activate();
b_run = true;
return true;
}
int CNetWork::svc(void)
{
ACE_Reactor rt;
this->reactor(&rt);
this->pServerUdp->reactor(&rt);
this->pCAcptor->reactor(&rt);
this->ptimer->reactor(&rt);
this->pServerUdp->Open_Event();
this->pCAcptor->open();
this->ptimer->Open(this);
rt.run_reactor_event_loop();
ACE_OS::sleep(ACE_Time_Value(1));
return 0;
}
int CNetWork::SendData(const Byte* buf, const int nSize, Byte byappid)
{
int isize = 0;
if (byappid == 0xff)      // 组发
{
  isize = GroupSend((char*)buf, nSize);
}
else if (byappid == ServerAppid)   // 心跳
{
  isize = pServerUdp->SendData((char*)buf, nSize);
}
else          // 单发点到点
{
  isize = SingleSend((char*)buf, nSize, byappid);
}
return isize;
}
int CNetWork::GetDataLength()
{
if (iprtype && pServerUdp->iClientStateSign) return (mpInfo.size() + 17);
if (lstData.empty()) return 0;
std::list<STListData>::iterator lstIter;
lstIter = lstData.begin();
return lstIter->iLength;
}
int CNetWork::GetData(Byte* buf)
{
if (iprtype && pServerUdp->iClientStateSign)
{
  std::map<Byte, STAppidcs>::iterator mpIter;
  unsigned char isize = 0;
  
  pServerUdp->iClientStateSign = 0;
  strcpy((char*)buf, "/x03/x01/x01");
  buf[3] = mpInfo.size() + 4;
  isize = 4;
  for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)
  {
   if (mpIter->second.sIp == sip) buf[isize++] = mpIter->second.byAppid;
  }
  fixFrame(buf, isize, 0x01, byAppid, ServerAppid);
  return isize;
}
if(lstData.empty()) return 0;
std::list<STListData>::iterator lstIter;
int ilen = 0;

// 返回用户数据
mlocka.acquire();
lstIter = lstData.begin();
ilen = lstIter->iLength;
memcpy(buf, lstIter->byData, ilen);
delete []lstIter->byData;
lstData.erase(lstIter);
mlocka.release();
return ilen;
}
void CNetWork::Close()
{
std::map<Byte, STAppidcs>::iterator mpIter;
Byte isize = 0x0D;
char  buf[255] = {0};
ACE_INET_Addr taddr(iport, sip.c_str());
ACE_UINT32  ip = taddr.get_ip_address();
u_short   iprt = taddr.get_port_number();
/*
strcpy(buf, "/x01/x01/x01/x0D");
buf[5] = byGroup;
memcpy(buf + 6, &ip, sizeof(ACE_UINT32));
memcpy(buf + 6 + sizeof(ACE_UINT32), &iprt, sizeof(u_short));
memcpy(buf + 6 + sizeof(ACE_UINT32) + sizeof(u_short), &byAppid, sizeof(Byte));
fixFrame((unsigned char*)buf, isize, 0x01, ServerAppid, byAppid);
*/
isize = pServerUdp->SendData(buf, isize);   
// 关闭监听事件
ptimer->handle_close(ACE_INVALID_HANDLE, 0);
pCAcptor->handle_close(ACE_INVALID_HANDLE, 0);
pServerUdp->handle_close(ACE_INVALID_HANDLE, 0);

// 关闭网络事件
for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++)
{
  if (mpIter->second.pcs != NULL)
  {   
   mpIter->second.pcs->handle_close(ACE_INVALID_HANDLE, 0);
   mpIter->second.pcs = NULL;
  }
}
// 清除连表
mlock_mp.acquire();
mpInfo.clear();
mlock_mp.release();
if (b_run && this->reactor()->reactor_event_loop_done() != 1) this->reactor()->end_reactor_event_loop();
}
int CNetWork::GroupSend(const char* buf, const int nSize)
{
std::map<Byte, STAppidcs>::iterator mpIter;
int isize = 0;
for (mpIter = mpInfo.begin(); mpIter!= mpInfo.end(); mpIter++)
{
  if (mpIter->second.byGroup == byGroup)
  {
   isize = SingleSend(buf, nSize, mpIter->second.byAppid);
  }
}
return isize;
}
int CNetWork::SingleSend(const char* buf, const int nSize, Byte byappid)
{
std::map<Byte, STAppidcs>::iterator mpIter;
Byte isize = 0;

for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++)
{
  if (mpIter->first != byappid) break;   
  if (mpIter->second.pcs != NULL)    // 已有连接
  {
   CClientService* p = mpIter->second.pcs;
   isize = p->SendData(buf, nSize);
  }
  else          // 无连接,新建连接到对端
  {
   CClientService* cs;
   ACE_INET_Addr addr(mpIter->second.iPort, mpIter->second.sIp.c_str());
   ACE_NEW_RETURN(cs, CClientService, 0);
   cs->SetParentHander(this);
   cs->reactor(this->reactor());
   if (cs->connect(addr) != -1)
   {
    char  puf[255] = {0};
   
    cs->Open();
    mpIter->second.pcs = cs;   // 将连接保存到连表
         
    strcpy(puf, "/x01/x01/x01/x06/x02");
    puf[5] = byGroup;
    isize = 0x06;
    fixFrame((unsigned char*)puf, isize, 0x01, byappid, byAppid);
    isize = mpIter->second.pcs->SendData(puf, isize);
   
    // 等待
    ACE_Time_Value t(1);
    ACE_OS::sleep(t);     
    isize = mpIter->second.pcs->SendData(buf, nSize);
   }
   else
   {
    delete cs;
   }
  }   
}
return isize;
}
bool CNetWork::ReadServerIP()
{
char sip1[20] = {0};
char sip2[20] = {0};
FILE* fp = NULL;
if ((fp = fopen("./ServerIp_c.txt", "r")) == NULL) return false;
fscanf(fp, "%s %s", sip1, sip2);
fclose(fp);
sServerIP = sip1;
sLocalIP = sip2;
return true;
}
/////////////////////////////////////////////////////////////////
bool CServerUdpcs::Open(Byte byappid, Byte bygroup)
{
ACE_INET_Addr taddr;
char   puf[255] = {0};
Byte  isize = 0x06;
ACE_Time_Value t(3);
ACE_INET_Addr taddr_local(net->sLocalIP.c_str());
udp.open(taddr_local);
taddr.set(iServerPort, net->sServerIP.c_str());
/*
strcpy(puf, "/x01/x01/x01/x06/x02");
puf[5] = bygroup;
fixFrame((unsigned char*)puf, isize, 0x01, ServerAppid, byappid);
*/
this->udp.send(puf, isize, taddr);

// 必须阻塞等待服务端返回分配的端口,否则不能建立监听
memset(puf, 0x00, 255);
ACE_OS::sleep(ACE_Time_Value(0, 10000));
isize = this->udp.recv(puf, 255, this->praddr, 0, &t);

if (isize > 0 && isize < 255)
{
  ChackProtocol(puf, isize);
  if (net->iport < 20000) //分配的端口都是>20000的,
  {
   udp.close();
   return false;
  }
  return true;
}
udp.close();
return false;
}
int CServerUdpcs::Open_Event()
{
return (this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK));
}
ACE_HANDLE CServerUdpcs::get_handle()const
{
return this->udp.get_handle();
}
int CServerUdpcs::handle_input(ACE_HANDLE fd)
{
char buf[255] = {0};
int isize = this->udp.recv(buf, 255, this->praddr);
if (isize > 0 && isize < 255) ChackProtocol(buf, isize);
return 0;
}
int CServerUdpcs::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
if (this->udp.get_handle() != ACE_INVALID_HANDLE)
{
  ACE_Reactor_Mask m = ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;
  this->reactor()->remove_handler(this, m);
  this->udp.close();
}
delete this;
return 0;
}
void CServerUdpcs::ChackProtocol(const char* buf, const int nSize)
{
Byte *p = (Byte*)buf;

if (checkFrame(p, nSize)) return;
switch (*(p + 11))
{
  case 0xF0:     // 消息应答
  {
   if (*(p + 6) == ServerAppid) ReadDataToList(buf, nSize);
   break;
  }
  case 0x01:     // my
  {
   switch (*(p + 15))
   {
    case 0x03:  // 返回本机IP和端口号
     SaveLocalhost(buf);
     break;
    case 0x01:  // 保存在线客户端信息(新增客户端)
     SaveMapinfo(buf);
     break;
    case 0x00:  // 更新在线客户端信息(客户端掉线)
     UpdateMapInfo(buf, nSize);
     break;
    default:
     break;
   }
   break;
  }
  default:
   break;
}
}
void CServerUdpcs::SetParentHander(CNetWork *p)
{
net = p;
}
int CServerUdpcs::SendData(const char* buf, const int nSize)
{
return this->udp.send(buf, nSize, this->praddr);
}
int CServerUdpcs::GetData(char* buf, const int nSize)
{
return this->udp.recv(buf, nSize, this->praddr);
}
void CServerUdpcs::SaveLocalhost(const char* buf)
{
net->sip = GetSourceIp(buf);  
net->iport = GetSourcePort(buf);
}
void CServerUdpcs::UpdateMapInfo(const char* buf, const int nSize)
{
std::map<Byte, STAppidcs>::iterator mpIter;
Byte byappid = (Byte)buf[23];
std::string sip = GetSourceIp(buf);
mlock_mp.acquire();
for (mpIter = net->mpInfo.find(byappid); mpIter != net->mpInfo.end(); mpIter++)   
{
  if (mpIter->first != byappid) break;
  if (mpIter->second.sIp != sip) continue;
  if (mpIter->second.pcs != NULL)
  {
   mpIter->second.pcs->handle_close(ACE_INVALID_HANDLE, 0);
   mpIter->second.pcs = NULL;
  }
  net->mpInfo.erase(mpIter);
  break;
}
mlock_mp.release();
iClientStateSign = 1;
}
void CServerUdpcs::ReadDataToList(const char* buf, const int nSize)
{
STListData stlData;
stlData.iLength = nSize;
stlData.byData = new Byte[nSize];
memcpy(stlData.byData, buf, nSize);
//将数据保存到连表
mlocka.acquire();
net->lstData.push_front(stlData);
mlocka.release();
}
void CServerUdpcs::SaveMapinfo(const char* buf)
{
std::map<Byte, STAppidcs>::iterator mpIter;
Byte *p = (Byte*)buf;
STAppidcs stAppidcs;
bool  b_Insert = false;
/*
stAppidcs.byAppid = *(p + 23);
stAppidcs.byGroup = *(p + 16);
stAppidcs.sIp = GetSourceIp(buf);
stAppidcs.iPort = GetSourcePort(buf);
stAppidcs.pcs = NULL;
*/
mlock_mp.acquire();
for ((mpIter = net->mpInfo.find(stAppidcs.byAppid)); mpIter != net->mpInfo.end(); mpIter++)
{
  if (mpIter->first != stAppidcs.byAppid) break;
  if (mpIter->second.sIp != stAppidcs.sIp) continue;
  b_Insert = true;
  break;
}
if (!b_Insert) net->mpInfo.insert(std::make_pair(stAppidcs.byAppid, stAppidcs));
mlock_mp.release();
iClientStateSign = 1;
}
std::string CServerUdpcs::GetSourceIp(const char* buf)
{
ACE_INET_Addr taddr;
int    iIp = 0;
memcpy(&iIp, buf + 17, 4);
taddr.set(1000, iIp);
std::string sip = taddr.get_host_addr();
return sip;
}
int CServerUdpcs::GetSourcePort(const char* buf)
{
int iport = 0;
memcpy(&iport, buf + 21, 2);
return iport;
}
//////////////////////////////////////////////////////
int CClientAcceptor::open(void * avg)
{  
ACE_INET_Addr addr(net->iport, net->sip.c_str());
this->acp.open(addr, 5);
return this->reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_HANDLE CClientAcceptor::get_handle()const
{
return this->acp.get_handle();
}
int CClientAcceptor::handle_input(ACE_HANDLE fd)
{
CClientService* cs = NULL;

cs = new CClientService();
cs->SetParentHander(net);

if (this->acp.accept(cs->peer()) == -1)
{
  delete cs;
  return 0;
}
cs->reactor(this->reactor());
if (cs->Open() == -1) cs->handle_close(ACE_INVALID_HANDLE, 0);

return 0;
}
int CClientAcceptor::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
if (this->acp.get_handle() != ACE_INVALID_HANDLE)
{
  ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL;
  this->reactor()->remove_handler(this, m);
  this->acp.close();
}
delete this;
return 0;
}
void CClientAcceptor::SetParentHander(CNetWork *p)
{
net = p;
}
///////////////////////////////////////////////////
int CClientService::Open()
{  
ACE_INET_Addr peeraddr;
this->sockeam.get_remote_addr(peeraddr);
stAppidcs.iPort = peeraddr.get_port_number();
stAppidcs.sIp =  peeraddr.get_host_addr();
stAppidcs.pcs = this;
return this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);
}
ACE_HANDLE CClientService::get_handle()const
{
return this->sockeam.get_handle();
}
int  CClientService::handle_input(ACE_HANDLE fd)
{
char buf[1024] = {0};
int isize = 0;
isize = this->sockeam.recv(buf, 1024);
if (isize <= 0) return -1;
if (isize > 0 && isize < 1024) ChackProtocol(buf, isize);
return 0;
}
int  CClientService::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask mask)
{
// 如果对方断开连接,会触发此事件,则会被调用两次(已屏蔽系统自动调用此函数)
if (mask == ACE_Event_Handler::WRITE_MASK) return 0;

DeleteAppid(this); // 删除外部指针
mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
this->reactor()->remove_handler(this, mask);
this->sockeam.close();
delete this;
return 0;
}
void CClientService::SetParentHander(CNetWork *p)
{
net = p;
}
int CClientService::SendData(const char* buf, const int nSize)
{
int  isize = 0;   
isize = this->sockeam.send_n(buf, nSize);
if (isize <= 0) this->handle_close(0, 0);
return isize;
}
int CClientService::connect(ACE_INET_Addr addr)
{
ACE_Time_Value itimeout(1);
return this->con.connect(sockeam, addr, &itimeout);
}
void CClientService::ChackProtocol(const char* buf, const int nSize)
{
Byte *p = (Byte*)buf;

if ((*(p + 11) == 0x01) && (*(p + 15) == 0x02))
{
  if (!checkFrame(p, nSize))
  {  
   stAppidcs.byAppid = *(p + 6);  // 请求连接的客户端APPID
   stAppidcs.byGroup = *(p + 16);
   AddAppid(stAppidcs);    // 增加客户连接
   return;
  }
}
ReadDataToList(buf, nSize);     // 保存用户数据
}
void CClientService::AddAppid(STAppidcs stAppidcs)
{
std::map<Byte, STAppidcs>::iterator mpIter;
for (mpIter = net->mpInfo.find(stAppidcs.byAppid); mpIter != net->mpInfo.end(); mpIter++)   
{
  if (mpIter->first != stAppidcs.byAppid) break;
  if (mpIter->second.sIp != stAppidcs.sIp) continue;
  mpIter->second.pcs = this;
  break;
}
}
void CClientService::DeleteAppid(CClientService* cs)
{
std::map<Byte, STAppidcs>::iterator mpIter;
for (mpIter = net->mpInfo.begin(); mpIter != net->mpInfo.end(); mpIter++)   
{
  if (mpIter->second.pcs == cs)
  {
   mpIter->second.pcs = NULL;
   break;
  }
}
}
void CClientService::ReadDataToList(const char* buf, const int nSize)
{
STListData stlData;
stlData.iLength = nSize;
stlData.byData = new Byte[nSize];
memcpy(stlData.byData, buf, nSize);
// 将数据保存到连表
if (net->lstData.size() > 500)
{
  std::list<STListData>::iterator lstIter;
  mlocka.acquire();
  lstIter = net->lstData.begin();
  delete []lstIter->byData;
  net->lstData.erase(lstIter);
  mlocka.release();
}
mlocka.acquire();
net->lstData.push_back(stlData);
mlocka.release();
}
//////////////////////////////////////////////
///*
bool  CTaskTimer::Open(CNetWork *p)
{
pnet = p;
ACE_Time_Value idlay(1);
ACE_Time_Value ival(40);
timeid = this->reactor()->schedule_timer(this, 0, idlay, ival);
return true;
}
int CTaskTimer::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{  
if (timeid) this->reactor()->cancel_timer(this);
delete this;
return 0;
}
int CTaskTimer::handle_timeout(const ACE_Time_Value& current_time, const void* act)
{
unsigned char buf[255] = {0};
unsigned char isize = 4;
/*
memcpy(buf, "/x00/x01/x01/x04", 4);
fixFrame(buf, isize, (pnet->iprtype ? 0x02 : 0x01), ServerAppid, pnet->byAppid);
*/
pnet->SendData(buf, isize, ServerAppid);
return 0;
}

 

 

 

 

eslint监测react代码 相关内容

2019-02-13 10:03:20 weixin_43746001 阅读数 246

.NET Reactor是一款功能强大的代码保护以及许可授权管理系统,主要用于开发人员对其.NET软件的保护,其支持所有支持.NET编译的程序开发语言。

上一篇文章给大家带来了.NET Reactor针对De4Dot脱壳工具的应对,本次给大家分享一下.NET Reactor中的工具。

一、Hardware ID Tool Generator(硬件ID工具生成器)

要将许可证链接到特定计算机,你必须将我们的HID.exe工具与你的可分发程序捆绑在一起。你的客户端运行HID.exe以生成硬件标识文件(* .hid),并使用其许可证申请提交该文件。使用.NET Reactor License Manager处理此.hid文件, 可在几秒钟内为已标识的计算机生成许可证。

硬件ID工具生成器生成自定义的HID.exe是可分发的。你必须配置硬件ID工具生成器以匹配你为受保护的程序集规定的HID设置。要将自定义HID.exe工具输出到受保护程序集的目标目录,请单击“Generate”。

Hardware ID Tool Generator

二、License Reactivation Tool(许可证重新激活工具)

许可证无效后,你可以使用此工具重新激活无效的许可证。如果要部署硬件标识锁,则可以使用许可证失效工具和许可证重新激活工具在一台计算机上强制执行许可证失效,以允许被许可方将安装移动到另一台计算机。

要重新激活许可证,请加载.NET Reactor项目,打开许可证重新激活工具,输入相应计算机的硬件ID,然后单击“Generate Reactivation Code”。每次单击“Generate Reactivation Code”时,都会生成新的激活代码。请注意,每个新的激活代码仅对一次重新激活有效。要重新激活许可证,请使用library License.dll(库许可证)的Status.ReactivateLicense(字符串代码)方法 。

License Reactivation Tool

三、Stack Trace Deobfuscator(堆栈跟踪反混淆器)

要设置堆栈跟踪,请加载相应的映射文件(由受保护的输出设置生成),复制并粘贴障碍堆栈跟踪,然后单击“Deobfuscate”。

Stack Trace Deobfuscator

四、ShareIt Module Generator(ShareIt模块生成器)

.NET Reactor可以轻松跳过生成ShareIt模块的环节。ShareIt模块生成器使用你的.NET Reactor项目文件创建ShareIt许可证文件生成器,可减轻你要生成ShareIt许可证文件的开销。

ShareIt Module Generator

                                                                              点击下载.NET Reactor最新试用版

eslint监测react代码 相关内容

没有更多推荐了,返回首页