新建一个react项目
2018-04-26 01:50:00 universsky2015 阅读数 56

Reactor Core

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactive Extensions inspired API and efficient event streaming support.

Getting it

Reactor 3 requires Java 8 or + to run.

With Gradle from repo.spring.io or Maven Central repositories (stable releases only):

    repositories {
//      maven { url 'http://repo.spring.io/snapshot' }
      maven { url 'http://repo.spring.io/milestone' }
      mavenCentral()
    }

    dependencies {
      //compile "io.projectreactor:reactor-core:3.1.4.RELEASE"
      //testCompile("io.projectreactor:reactor-test:3.1.4.RELEASE")
      compile "io.projectreactor:reactor-core:3.2.0.M1"
      testCompile("io.projectreactor:reactor-test:3.2.0.M1")
    }

See the reference documentation
for more information on getting it (eg. using Maven, or on how to get milestones and snapshots).

Note about Android support: Reactor 3 doesn't officially support nor target Android.
However it should work fine with Android SDK 26 (Android O) and above. See the
complete note
in the reference guide.

Getting Started

New to Reactive Programming or bored of reading already ? Try the Introduction to Reactor Core hands-on !

If you are familiar with RxJava or if you want to check more detailled introduction, be sure to check
https://www.infoq.com/articles/reactor-by-example !

Flux

A Reactive Streams Publisher with basic flow operators.

  • Static factories on Flux allow for source generation from arbitrary callbacks types.
  • Instance methods allows operational building, materialized on each Flux#subscribe(), Flux#subscribe() or multicasting operations such as Flux#publish and Flux#publishNext.

<img src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/flux.png" width="500">

Flux in action :

Flux.fromIterable(getSomeLongList())
    .mergeWith(Flux.interval(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .subscribe(System.out::println);

Mono

A Reactive Streams Publisher constrained to ZERO or ONE element with appropriate operators.

  • Static factories on Mono allow for deterministic zero or one sequence generation from arbitrary callbacks types.
  • Instance methods allows operational building, materialized on each Mono#subscribe() or Mono#get() eventually called.

<img src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/mono.png" width="500">

Mono in action :

Mono.fromCallable(System::currentTimeMillis)
    .flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time)))
    .timeout(Duration.ofSeconds(3), errorHandler::fallback)
    .doOnSuccess(r -> serviceM.incrementSuccess())
    .subscribe(System.out::println);

Blocking Mono result :

Tuple2<Long, Long> nowAndLater = 
        Mono.zip(
                Mono.just(System.currentTimeMillis()),
                Flux.just(1).delay(1).map(i -> System.currentTimeMillis()))
            .block();

Schedulers

Reactor uses a Scheduler as a
contract for arbitrary task execution. It provides some guarantees required by Reactive
Streams flows like FIFO execution.

You can use or create efficient schedulers
to jump thread on the producing flows (subscribeOn) or receiving flows (publishOn):


Mono.fromCallable( () -> System.currentTimeMillis() )
    .repeat()
    .publishOn(Schedulers.single())
    .log("foo.bar")
    .flatMap(time ->
        Mono.fromCallable(() -> { Thread.sleep(1000); return time; })
            .subscribeOn(Schedulers.parallel())
    , 8) //maxConcurrency 8
    .subscribe();

ParallelFlux

ParallelFlux can starve your CPU's from any sequence whose work can be subdivided in concurrent
tasks. Turn back into a Flux with ParallelFlux#sequential(), an unordered join or
use abitrary merge strategies via 'groups()'.

Mono.fromCallable( () -> System.currentTimeMillis() )
    .repeat()
    .parallel(8) //parallelism
    .runOn(Schedulers.parallel())
    .doOnNext( d -> System.out.println("I'm on thread "+Thread.currentThread()) )
    .subscribe()

Custom sources : Flux.create and FluxSink, Mono.create and MonoSink

To bridge a Subscriber or Processor into an outside context that is taking care of
producing non concurrently, use Flux#create, Mono#create.

Flux.create(sink -> {
         ActionListener al = e -> {
            sink.next(textField.getText());
         };

         // without cancellation support:
         button.addActionListener(al);

         // with cancellation support:
         sink.onCancel(() -> {
            button.removeListener(al);
         });
    },
    // Overflow (backpressure) handling, default is BUFFER
    FluxSink.OverflowStrategy.LATEST)
    .timeout(3)
    .doOnComplete(() -> System.out.println("completed!"))
    .subscribe(System.out::println)

The Backpressure Thing

Most of this cool stuff uses bounded ring buffer implementation under the hood to mitigate signal processing difference between producers and consumers. Now, the operators and processors or any standard reactive stream component working on the sequence will be instructed to flow in when these buffers have free room AND only then. This means that we make sure we both have a deterministic capacity model (bounded buffer) and we never block (request more data on write capacity). Yup, it's not rocket science after all, the boring part is already being worked by us in collaboration with Reactive Streams Commons on going research effort.

What's more in it ?

"Operator Fusion" (flow optimizers), health state observers, helpers to build custom reactive components, bounded queue generator, hash-wheel timer, converters from/to Java 9 Flow, Publisher and Java 8 CompletableFuture. The repository contains a reactor-test project with test features like the StepVerifier.


Reference Guide

http://projectreactor.io/docs/core/release/reference/docs/index.html

Javadoc

https://projectreactor.io/docs/core/release/api/

Getting started with Flux and Mono

https://github.com/reactor/lite-rx-api-hands-on

Reactor By Example

https://www.infoq.com/articles/reactor-by-example

Head-First Spring & Reactor

https://github.com/reactor/head-first-reactive-with-spring-and-reactor/

Beyond Reactor Core

  • Everything to jump outside the JVM with the non-blocking drivers from Reactor Netty.
  • Reactor Addons provide for adapters and extra operators for Reactor 3.

Powered by Reactive Streams Commons

Licensed under Apache Software License 2.0

Sponsored by Pivotal

新建一个react项目 相关内容

2018-04-26 01:50:00 weixin_34050389 阅读数 6

Reactor Core

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactive Extensions inspired API and efficient event streaming support.

Getting it

Reactor 3 requires Java 8 or + to run.

With Gradle from repo.spring.io or Maven Central repositories (stable releases only):

    repositories {
//      maven { url 'http://repo.spring.io/snapshot' }
      maven { url 'http://repo.spring.io/milestone' }
      mavenCentral()
    }

    dependencies {
      //compile "io.projectreactor:reactor-core:3.1.4.RELEASE"
      //testCompile("io.projectreactor:reactor-test:3.1.4.RELEASE")
      compile "io.projectreactor:reactor-core:3.2.0.M1"
      testCompile("io.projectreactor:reactor-test:3.2.0.M1")
    }

See the reference documentation
for more information on getting it (eg. using Maven, or on how to get milestones and snapshots).

Note about Android support: Reactor 3 doesn't officially support nor target Android.
However it should work fine with Android SDK 26 (Android O) and above. See the
complete note
in the reference guide.

Getting Started

New to Reactive Programming or bored of reading already ? Try the Introduction to Reactor Core hands-on !

If you are familiar with RxJava or if you want to check more detailled introduction, be sure to check
https://www.infoq.com/articles/reactor-by-example !

Flux

A Reactive Streams Publisher with basic flow operators.

  • Static factories on Flux allow for source generation from arbitrary callbacks types.
  • Instance methods allows operational building, materialized on each Flux#subscribe(), Flux#subscribe() or multicasting operations such as Flux#publish and Flux#publishNext.

<img src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/flux.png" width="500">

Flux in action :

Flux.fromIterable(getSomeLongList())
    .mergeWith(Flux.interval(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .subscribe(System.out::println);

Mono

A Reactive Streams Publisher constrained to ZERO or ONE element with appropriate operators.

  • Static factories on Mono allow for deterministic zero or one sequence generation from arbitrary callbacks types.
  • Instance methods allows operational building, materialized on each Mono#subscribe() or Mono#get() eventually called.

<img src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/mono.png" width="500">

Mono in action :

Mono.fromCallable(System::currentTimeMillis)
    .flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time)))
    .timeout(Duration.ofSeconds(3), errorHandler::fallback)
    .doOnSuccess(r -> serviceM.incrementSuccess())
    .subscribe(System.out::println);

Blocking Mono result :

Tuple2<Long, Long> nowAndLater = 
        Mono.zip(
                Mono.just(System.currentTimeMillis()),
                Flux.just(1).delay(1).map(i -> System.currentTimeMillis()))
            .block();

Schedulers

Reactor uses a Scheduler as a
contract for arbitrary task execution. It provides some guarantees required by Reactive
Streams flows like FIFO execution.

You can use or create efficient schedulers
to jump thread on the producing flows (subscribeOn) or receiving flows (publishOn):


Mono.fromCallable( () -> System.currentTimeMillis() )
    .repeat()
    .publishOn(Schedulers.single())
    .log("foo.bar")
    .flatMap(time ->
        Mono.fromCallable(() -> { Thread.sleep(1000); return time; })
            .subscribeOn(Schedulers.parallel())
    , 8) //maxConcurrency 8
    .subscribe();

ParallelFlux

ParallelFlux can starve your CPU's from any sequence whose work can be subdivided in concurrent
tasks. Turn back into a Flux with ParallelFlux#sequential(), an unordered join or
use abitrary merge strategies via 'groups()'.

Mono.fromCallable( () -> System.currentTimeMillis() )
    .repeat()
    .parallel(8) //parallelism
    .runOn(Schedulers.parallel())
    .doOnNext( d -> System.out.println("I'm on thread "+Thread.currentThread()) )
    .subscribe()

Custom sources : Flux.create and FluxSink, Mono.create and MonoSink

To bridge a Subscriber or Processor into an outside context that is taking care of
producing non concurrently, use Flux#create, Mono#create.

Flux.create(sink -> {
         ActionListener al = e -> {
            sink.next(textField.getText());
         };

         // without cancellation support:
         button.addActionListener(al);

         // with cancellation support:
         sink.onCancel(() -> {
            button.removeListener(al);
         });
    },
    // Overflow (backpressure) handling, default is BUFFER
    FluxSink.OverflowStrategy.LATEST)
    .timeout(3)
    .doOnComplete(() -> System.out.println("completed!"))
    .subscribe(System.out::println)

The Backpressure Thing

Most of this cool stuff uses bounded ring buffer implementation under the hood to mitigate signal processing difference between producers and consumers. Now, the operators and processors or any standard reactive stream component working on the sequence will be instructed to flow in when these buffers have free room AND only then. This means that we make sure we both have a deterministic capacity model (bounded buffer) and we never block (request more data on write capacity). Yup, it's not rocket science after all, the boring part is already being worked by us in collaboration with Reactive Streams Commons on going research effort.

What's more in it ?

"Operator Fusion" (flow optimizers), health state observers, helpers to build custom reactive components, bounded queue generator, hash-wheel timer, converters from/to Java 9 Flow, Publisher and Java 8 CompletableFuture. The repository contains a reactor-test project with test features like the StepVerifier.


Reference Guide

http://projectreactor.io/docs/core/release/reference/docs/index.html

Javadoc

https://projectreactor.io/docs/core/release/api/

Getting started with Flux and Mono

https://github.com/reactor/lite-rx-api-hands-on

Reactor By Example

https://www.infoq.com/articles/reactor-by-example

Head-First Spring & Reactor

https://github.com/reactor/head-first-reactive-with-spring-and-reactor/

Beyond Reactor Core

  • Everything to jump outside the JVM with the non-blocking drivers from Reactor Netty.
  • Reactor Addons provide for adapters and extra operators for Reactor 3.

Powered by Reactive Streams Commons

Licensed under Apache Software License 2.0

Sponsored by Pivotal

新建一个react项目 相关内容

2018-04-26 01:50:00 weixin_34348174 阅读数 45

Reactor Core

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactive Extensions inspired API and efficient event streaming support.

Getting it

Reactor 3 requires Java 8 or + to run.

With Gradle from repo.spring.io or Maven Central repositories (stable releases only):

    repositories {
//      maven { url 'http://repo.spring.io/snapshot' }
      maven { url 'http://repo.spring.io/milestone' }
      mavenCentral()
    }

    dependencies {
      //compile "io.projectreactor:reactor-core:3.1.4.RELEASE"
      //testCompile("io.projectreactor:reactor-test:3.1.4.RELEASE")
      compile "io.projectreactor:reactor-core:3.2.0.M1"
      testCompile("io.projectreactor:reactor-test:3.2.0.M1")
    }

See the reference documentation
for more information on getting it (eg. using Maven, or on how to get milestones and snapshots).

Note about Android support: Reactor 3 doesn't officially support nor target Android.
However it should work fine with Android SDK 26 (Android O) and above. See the
complete note
in the reference guide.

Getting Started

New to Reactive Programming or bored of reading already ? Try the Introduction to Reactor Core hands-on !

If you are familiar with RxJava or if you want to check more detailled introduction, be sure to check
https://www.infoq.com/articles/reactor-by-example !

Flux

A Reactive Streams Publisher with basic flow operators.

  • Static factories on Flux allow for source generation from arbitrary callbacks types.
  • Instance methods allows operational building, materialized on each Flux#subscribe(), Flux#subscribe() or multicasting operations such as Flux#publish and Flux#publishNext.

<img src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/flux.png" width="500">

Flux in action :

Flux.fromIterable(getSomeLongList())
    .mergeWith(Flux.interval(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .subscribe(System.out::println);

Mono

A Reactive Streams Publisher constrained to ZERO or ONE element with appropriate operators.

  • Static factories on Mono allow for deterministic zero or one sequence generation from arbitrary callbacks types.
  • Instance methods allows operational building, materialized on each Mono#subscribe() or Mono#get() eventually called.

<img src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/mono.png" width="500">

Mono in action :

Mono.fromCallable(System::currentTimeMillis)
    .flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time)))
    .timeout(Duration.ofSeconds(3), errorHandler::fallback)
    .doOnSuccess(r -> serviceM.incrementSuccess())
    .subscribe(System.out::println);

Blocking Mono result :

Tuple2<Long, Long> nowAndLater = 
        Mono.zip(
                Mono.just(System.currentTimeMillis()),
                Flux.just(1).delay(1).map(i -> System.currentTimeMillis()))
            .block();

Schedulers

Reactor uses a Scheduler as a
contract for arbitrary task execution. It provides some guarantees required by Reactive
Streams flows like FIFO execution.

You can use or create efficient schedulers
to jump thread on the producing flows (subscribeOn) or receiving flows (publishOn):


Mono.fromCallable( () -> System.currentTimeMillis() )
    .repeat()
    .publishOn(Schedulers.single())
    .log("foo.bar")
    .flatMap(time ->
        Mono.fromCallable(() -> { Thread.sleep(1000); return time; })
            .subscribeOn(Schedulers.parallel())
    , 8) //maxConcurrency 8
    .subscribe();

ParallelFlux

ParallelFlux can starve your CPU's from any sequence whose work can be subdivided in concurrent
tasks. Turn back into a Flux with ParallelFlux#sequential(), an unordered join or
use abitrary merge strategies via 'groups()'.

Mono.fromCallable( () -> System.currentTimeMillis() )
    .repeat()
    .parallel(8) //parallelism
    .runOn(Schedulers.parallel())
    .doOnNext( d -> System.out.println("I'm on thread "+Thread.currentThread()) )
    .subscribe()

Custom sources : Flux.create and FluxSink, Mono.create and MonoSink

To bridge a Subscriber or Processor into an outside context that is taking care of
producing non concurrently, use Flux#create, Mono#create.

Flux.create(sink -> {
         ActionListener al = e -> {
            sink.next(textField.getText());
         };

         // without cancellation support:
         button.addActionListener(al);

         // with cancellation support:
         sink.onCancel(() -> {
            button.removeListener(al);
         });
    },
    // Overflow (backpressure) handling, default is BUFFER
    FluxSink.OverflowStrategy.LATEST)
    .timeout(3)
    .doOnComplete(() -> System.out.println("completed!"))
    .subscribe(System.out::println)

The Backpressure Thing

Most of this cool stuff uses bounded ring buffer implementation under the hood to mitigate signal processing difference between producers and consumers. Now, the operators and processors or any standard reactive stream component working on the sequence will be instructed to flow in when these buffers have free room AND only then. This means that we make sure we both have a deterministic capacity model (bounded buffer) and we never block (request more data on write capacity). Yup, it's not rocket science after all, the boring part is already being worked by us in collaboration with Reactive Streams Commons on going research effort.

What's more in it ?

"Operator Fusion" (flow optimizers), health state observers, helpers to build custom reactive components, bounded queue generator, hash-wheel timer, converters from/to Java 9 Flow, Publisher and Java 8 CompletableFuture. The repository contains a reactor-test project with test features like the StepVerifier.


Reference Guide

http://projectreactor.io/docs/core/release/reference/docs/index.html

Javadoc

https://projectreactor.io/docs/core/release/api/

Getting started with Flux and Mono

https://github.com/reactor/lite-rx-api-hands-on

Reactor By Example

https://www.infoq.com/articles/reactor-by-example

Head-First Spring & Reactor

https://github.com/reactor/head-first-reactive-with-spring-and-reactor/

Beyond Reactor Core

  • Everything to jump outside the JVM with the non-blocking drivers from Reactor Netty.
  • Reactor Addons provide for adapters and extra operators for Reactor 3.

Powered by Reactive Streams Commons

Licensed under Apache Software License 2.0

Sponsored by Pivotal

新建一个react项目 相关内容

2018-07-16 23:40:03 zxm342698145 阅读数 1057

这些天一直在研究网络编程源码,发现很多开源项目都用到了reactor网络模式,例如libevent,skynet,muduo等等。现在对reactor模式也有了一定的认识。

Reactor模式是编写高性能网络服务器的必备技术之一,它具有如下的优点:
1)响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
3)可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;

4)可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;

 

使用Reactor模型,必备的几个组件:事件源、Reactor框架、多路复用机制和事件处理程序,先来看看Reactor模型的整体框架,接下来再对每个组件做逐一说明。

1) 事件源

Linux上是文件描述符,Windows上就是Socket或者Handle了,这里统一称为“句柄集”;程序在指定的句柄上注册关心的事件,比如I/O事件。

2) event demultiplexer——事件多路分发机制

由操作系统提供的I/O多路复用机制,比如select和epoll。程序首先将其关心的句柄(事件源)及其事件注册到event demultiplexer上;

当有事件到达时,event demultiplexer会发出通知“在已经注册的句柄集中,一个或多个句柄的事件已经就绪”;程序收到通知后,就可以在非阻塞的情况下对事件进行处理了。

3) Reactor——反应器

Reactor,是事件管理的接口,内部使用event demultiplexer注册、注销事件;并运行事件循环,当有事件进入“就绪”状态时,调用注册事件的回调函数处理事件。

一个典型的Reactor声明方式如下:

class Reactor  {
public:
    Reactor()  {}
    ~Reactor(){}

    int RegisterHandler(EventHandler* handler, int event);

    int RemoveHandler(EventHandler*);

    void HandleEvents();
private:
    EventDemultiplexer*  m_demultiplexer;
};

4) Event Handler——事件处理程序

事件处理程序提供了一组接口,每个接口对应了一种类型的事件,供Reactor在相应的事件发生时调用,执行相应的事件处理。通常它会绑定一个有效的句柄。

下面是典型的Event Handler类声明方式:

class EventHandler  {
public:
    virtual void HandleRead() = 0;
    virtual void HandleWrite() = 0 ;
    virtual void HandleError() = 0;
   
};

下面是完整的实现:

 

https://github.com/shonm520/my-reactor

由于休陪产假在家,没有办公环境,macbook又不支持linux epoll,家里网络又不给力,费了好大力气下载了vbox,安装Linux虚拟机,整个环境搭建好都到半夜了。macbook写代码调代码还真是没有公司台式机,双显示器方便,第二天也花了很多时间才调通bug,上传至git,记录下。

 

 

 

新建一个react项目 相关内容

2014-10-23 20:01:36 wsb19871010 阅读数 346

一个简单的ACE Reactor框架的使用

服务端:
#include "iostream"
#include "ace\OS_NS_unistd.h"
#include "ace\INET_Addr.h"
#include "ace\SOCK_Stream.h"
#include "ace\SOCK_Acceptor.h"
#include "ace\Event_Handler.h"
#include "ace\Reactor.h"
#include "ace\Thread_Manager.h"

using namespace std;

class Server : public ACE_Event_Handler
{
public:
	Server(ACE_Reactor *reactor);

	int Open(const char *ip, int port);

	virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
	virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);

protected:
	~Server(){}

private:
	ACE_SOCK_Acceptor acceptor;
	ACE_Thread_Manager threadManager;
};

class ClientHandle : public ACE_Event_Handler
{
public:
	ClientHandle(ACE_HANDLE handle, ACE_Reactor *re);

	virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
	virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);

	ACE_SOCK_Stream &GetSockStream(){ return stream; }

protected:
	~ClientHandle(){}

private:
	ACE_SOCK_Stream stream;
};

ACE_THR_FUNC_RETURN ClientThread(void *param);

int main(int argc, char *argv[])
{
	Server *server = new Server(ACE_Reactor::instance());
	server->Open("192.168.60.65", 9000);

	// 启动消息循环
	ACE_Reactor::instance()->run_event_loop();
	return 0;
}

Server::Server(ACE_Reactor *reactor)
:ACE_Event_Handler(reactor)
{

}

int Server::Open(const char *ip, int port)
{
	ACE_INET_Addr addr(port, ip);
	if (acceptor.open(addr) < 0)
	{
		return -1;
	}

	// 注册接收客户端连接消息
	return reactor()->register_handler(acceptor.get_handle(), this, ACE_Event_Handler::ACCEPT_MASK);
}

int Server::handle_input(ACE_HANDLE fd)
{
	if (fd != acceptor.get_handle())
	{
		return -1;
	}

	ACE_SOCK_Stream stream;
	if (acceptor.accept(stream) < 0)
	{
		return -1;
	}

	ClientHandle *clientHandle = new ClientHandle(stream.get_handle(), reactor());
	threadManager.spawn(ClientThread, clientHandle);

	return 0;
}

int Server::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
	acceptor.close();
	delete this;
	return 0;
}

ClientHandle::ClientHandle(ACE_HANDLE handle, ACE_Reactor *re)
{
	stream.set_handle(handle);
	reactor(re);
}

int ClientHandle::handle_input(ACE_HANDLE fd)
{
	if (stream.get_handle() != fd)
	{
		return -1;
	}

	char buf[1024] = { 0 };
	if (stream.recv(buf, 1024) <= 0)
	{
		return -1;
	}
	cout << "Client:" << buf << endl;

	return 0;
}

int ClientHandle::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
	stream.send("bye client", strlen("bye client"));
	stream.close();
	delete this;
	return 0;
}

ACE_THR_FUNC_RETURN ClientThread(void *param)
{
	ClientHandle *clientHandle = (ClientHandle *)param;
	clientHandle->reactor()->register_handler(clientHandle->GetSockStream().get_handle(), clientHandle, ACE_Event_Handler::READ_MASK);
	
	ACE_OS::sleep(2);
	clientHandle->GetSockStream().send("hello client", strlen("hello client"));
	ACE_OS::sleep(2);
	clientHandle->reactor()->remove_handler(clientHandle->GetSockStream().get_handle(), ACE_Event_Handler::READ_MASK);
	ACE_OS::sleep(2);

	return 0;
}

客户端:
#include "iostream"
#include "ace\INET_Addr.h"
#include "ace\SOCK_Stream.h"
#include "ace\SOCK_Connector.h"
#include "ace\Reactor.h"

using namespace std;

class Client : public ACE_Event_Handler
{
public:
	Client(ACE_Reactor *reactor);

	virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
	virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);

	int Connect(char *ip, int port);
	int Send(char *buf, int len);

protected:
	~Client(){}

private:
	ACE_SOCK_Stream stream;
	ACE_SOCK_Connector connector;
};

int main(int argv, char *argc[])
{
	Client *client = new Client(ACE_Reactor::instance());
	if (client->Connect("192.168.60.65", 9000) < 0)
	{
		return 0;
	}

	client->Send("hello server", strlen("hello server"));

	// 启动消息循环
	ACE_Reactor::instance()->run_event_loop();
	return 0;
}

Client::Client(ACE_Reactor *reactor)
:ACE_Event_Handler(reactor)
{

}

int Client::Connect(char *ip, int port)
{
	ACE_INET_Addr addr(port, ip);
	// 连接服务
	if (connector.connect(stream, addr) < 0)
	{
		return -1;
	}
	// 注册读事件
	reactor()->register_handler(stream.get_handle(), this, ACE_Event_Handler::READ_MASK);

	return 0;
}

int Client::Send(char *buf, int len)
{
	return stream.send(buf, len);
}

int Client::handle_input(ACE_HANDLE fd)
{
	char buf[1024] = { 0 };
	if (stream.recv(buf, 1024) <= 0)
	{
		return -1;
	}
	cout << "Server:" << buf << endl;

	return 0;
}

int Client::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
	stream.close();
	delete this;
	return 0;
}


新建一个react项目 相关内容

没有更多推荐了,返回首页