为您推荐:
精华内容
最热下载
问答
  • 5星
    738KB guoruibin123 2021-05-31 15:03:04
  • 3KB weixin_42116791 2021-05-23 09:57:14
  • 6KB weixin_42148053 2021-05-05 15:46:01
  • 13KB weixin_42116701 2021-04-28 14:33:30
  • 4星
    249KB mfc_mfc 2018-12-17 16:48:33
  • 40KB robertbaker 2013-12-20 10:12:08
  • 72KB weixin_42157188 2021-05-08 12:02:36
  • 什么是ringbuffer 嗯,正如名字所说的一样,它是一个环(首尾相接的环),你可以把它用做在不同上下文(线程、协程)间传递数据的buffer。 基本来说,ringbuffer拥有一个固定长度,且每个位置有一个序号,并且是...

    什么是ringbuffer

    嗯,正如名字所说的一样,它是一个环(首尾相接的环),你可以把它用做在不同上下文(线程、协程)间传递数据的buffer。

    基本来说,ringbuffer拥有一个固定长度,且每个位置有一个序号,并且是连续的。

    随着你不停地填充这个buffer(可能也会有相应的读取),这个序号会一直增长,直到绕过这个环。

    一般,ringbuffer都是由数组实现,而由于其在内存上的连续性,因此性能得到了极高的提升。

    从数组上看,是这样的,不再是环形。

    一般定义的数据结构

    type RingBuffer struct{
    	buffer []interface{}
    	read	uint64  //读的位置
    	write	uint64  //写的位置
    	size	 uint64  //缓冲区大小
    }
    

    几大概念:

    read == write 时,缓冲区为空

    (write + 1) % size == read, 缓冲区满了

    几大困难点

    绕回

    ringbuffer看上去是一个环,但是实际上是一个数组,写入到数组尾部之后需要绕回到数组首部。但是由于数据包的大小是不定大小,所以到了尾部可能会出现数据分割,包一半在尾部一半在开头,对应的读取的时候数据包一半在尾部,一半在开头需要把它们合并起来。

    写入过快,释放覆盖

    当写入速度大于读取速度时,新写入数据会与未读数据发生覆盖,这样就有两种决策:覆盖未读数据和丢弃新写入数据。

    避免重复读取

    由于ringbuffer是通过覆盖写入数据,并不会删除未读数据,所以就通过ringbuffer中的read来判断是否还有未读数据。

    重读/重写的需求

    假设一个消费场景,消费是耗时的,只有当消费者确认了,这个对象已经被消费掉了,才能被释放掉资源。这时候就需要重读/重写

    优点

    我们使用 Ring Buffer 这种数据结构,是因为它给我们提供了可靠的消息传递特性。

    这个理由就足够了,不过它还有一些其他的优点。

    首先,Ring Buffer 比链表要快,因为它是数组,而且有一个容易预测的访问模式。

    CPU 高速缓存友好 (CPU-cache-friendly)-数据可以在硬件层面预加载到高速缓存,因此 CPU 不需要经常回到主内存 RAM 里去寻找 Ring Buffer 的下一条数据。

    Ring Buffer 是一个数组,你可以预先分配内存,并保持数组元素永远有效。这意味着内存垃圾收集(GC)在这种情况下几乎什么也不用做。此外,也不像链表那样每增加一条数据都要创建对象-当这些数据从链表里删除时,这些对象都要被清理掉。

    转载于:https://my.oschina.net/lwl1989/blog/3098086

    展开全文
    chongzong1056 2019-09-15 23:27:15
  • /* * $Id: ringbuffer.c,v 1.1 2004/04/07 22:03:53 wmaycisco Exp $ * ringbuffer.c * Ring Buffer utility.. * * Author: Phil Burk, http://www.softsynth.com * * This program uses the PortAudio Portable Aud...

    /* * $Id: ringbuffer.c,v 1.1 2004/04/07 22:03:53 wmaycisco Exp $ * ringbuffer.c * Ring Buffer utility.. * * Author: Phil Burk, http://www.softsynth.com * * This program uses the PortAudio Portable Audio Library. * For more information see: http://www.audiomulch.com/portaudio/ * Copyright (c) 1999-2000 Ross Bencina and Phil Burk * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files * (the "Software"), to deal in the Software without restriction, * including without limitation the rights to use, copy, modify, merge, * publish, distribute, sublicense, and/or sell copies of the Software, * and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * Any person wishing to distribute modifications to the Software is * requested to send the modifications to the original developer so that * they can be incorporated into the canonical version. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR * ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * */#include #include #include #include "ringbuffer.h"#include /*************************************************************************** * Initialize FIFO. * numBytes must be power of 2, returns -1 if not. */long RingBuffer_Init( RingBuffer *rbuf, long numBytes, void *dataPtr ){ if( ((numBytes-1) & numBytes) != 0) return -1; /* Not Power of two. */ rbuf->bufferSize = numBytes; rbuf->buffer = (char *)dataPtr; RingBuffer_Flush( rbuf ); rbuf->bigMask = (numBytes*2)-1; rbuf->smallMask = (numBytes)-1; return 0;}/***************************************************************************** Return number of bytes available for reading. */long RingBuffer_GetReadAvailable( RingBuffer *rbuf ){ return ( (rbuf->writeIndex - rbuf->readIndex) & rbuf->bigMask );}/***************************************************************************** Return number of bytes available for writing. */long RingBuffer_GetWriteAvailable( RingBuffer *rbuf ){ return ( rbuf->bufferSize - RingBuffer_GetReadAvailable(rbuf));}/***************************************************************************** Clear buffer. Should only be called when buffer is NOT being read. */void RingBuffer_Flush( RingBuffer *rbuf ){ rbuf->writeIndex = rbuf->readIndex = 0;}/***************************************************************************** Get address of region(s) to which we can write data.** If the region is contiguous, size2 will be zero.** If non-contiguous, size2 will be the size of second region.** Returns room available to be written or numBytes, whichever is smaller.*/long RingBuffer_GetWriteRegions( RingBuffer *rbuf, long numBytes, void **dataPtr1, long *sizePtr1, void **dataPtr2, long *sizePtr2 ){ long index; long available = RingBuffer_GetWriteAvailable( rbuf ); if( numBytes > available ) numBytes = available; /* Check to see if write is not contiguous. */ index = rbuf->writeIndex & rbuf->smallMask; if( (index + numBytes) > rbuf->bufferSize ) { /* Write data in two blocks that wrap the buffer. */ long firstHalf = rbuf->bufferSize - index; *dataPtr1 = &rbuf->buffer[index]; *sizePtr1 = firstHalf; *dataPtr2 = &rbuf->buffer[0]; *sizePtr2 = numBytes - firstHalf; } else { *dataPtr1 = &rbuf->buffer[index]; *sizePtr1 = numBytes; *dataPtr2 = NULL; *sizePtr2 = 0; } return numBytes;}/****************************************************************************/long RingBuffer_AdvanceWriteIndex( RingBuffer *rbuf, long numBytes ){ return rbuf->writeIndex = (rbuf->writeIndex + numBytes) & rbuf->bigMask;}/***************************************************************************** Get address of region(s) from which we can read data.** If the region is contiguous, size2 will be zero.** If non-contiguous, size2 will be the size of second region.** Returns room available to be written or numBytes, whichever is smaller.*/long RingBuffer_GetReadRegions( RingBuffer *rbuf, long numBytes, void **dataPtr1, long *sizePtr1, void **dataPtr2, long *sizePtr2 ){ long index; long available = RingBuffer_GetReadAvailable( rbuf ); if( numBytes > available ) numBytes = available; /* Check to see if read is not contiguous. */ index = rbuf->readIndex & rbuf->smallMask; if( (index + numBytes) > rbuf->bufferSize ) { /* Write data in two blocks that wrap the buffer. */ long firstHalf = rbuf->bufferSize - index; *dataPtr1 = &rbuf->buffer[index]; *sizePtr1 = firstHalf; *dataPtr2 = &rbuf->buffer[0]; *sizePtr2 = numBytes - firstHalf; } else { *dataPtr1 = &rbuf->buffer[index]; *sizePtr1 = numBytes; *dataPtr2 = NULL; *sizePtr2 = 0; } return numBytes;}/****************************************************************************/long RingBuffer_AdvanceReadIndex( RingBuffer *rbuf, long numBytes ){ return rbuf->readIndex = (rbuf->readIndex + numBytes) & rbuf->bigMask;}/***************************************************************************** Return bytes written. */long RingBuffer_Write( RingBuffer *rbuf, void *data, long numBytes ){ long size1, size2, numWritten; void *data1, *data2; numWritten = RingBuffer_GetWriteRegions( rbuf, numBytes, &data1, &size1, &data2, &size2 ); if( size2 > 0 ) { memcpy( data1, data, size1 ); data = ((char *)data) + size1; memcpy( data2, data, size2 ); } else { memcpy( data1, data, size1 ); } RingBuffer_AdvanceWriteIndex( rbuf, numWritten ); return numWritten;}/***************************************************************************** Return bytes read. */long RingBuffer_Read( RingBuffer *rbuf, void *data, long numBytes ){ long size1, size2, numRead; void *data1, *data2; numRead = RingBuffer_GetReadRegions( rbuf, numBytes, &data1, &size1, &data2, &size2 ); if( size2 > 0 ) { memcpy( data, data1, size1 ); data = ((char *)data) + size1; memcpy( data, data2, size2 ); } else { memcpy( data, data1, size1 ); } RingBuffer_AdvanceReadIndex( rbuf, numRead ); return numRead;}

    展开全文
    weixin_35915385 2021-01-14 17:14:56
  • 如果您正苦於以下問題:Java RingBuffer.publish方法的具體用法?Java RingBuffer.publish怎麽用?Java RingBuffer.publish使用的例子?那麽恭喜您, 這裏精選的方法代碼示例或許可以為您提供幫助。您也可以進一步...

    本文整理匯總了Java中com.lmax.disruptor.RingBuffer.publish方法的典型用法代碼示例。如果您正苦於以下問題:Java RingBuffer.publish方法的具體用法?Java RingBuffer.publish怎麽用?Java RingBuffer.publish使用的例子?那麽恭喜您, 這裏精選的方法代碼示例或許可以為您提供幫助。您也可以進一步了解該方法所在類com.lmax.disruptor.RingBuffer的用法示例。

    在下文中一共展示了RingBuffer.publish方法的23個代碼示例,這些例子默認根據受歡迎程度排序。您可以為喜歡或者感覺有用的代碼點讚,您的評價將有助於我們的係統推薦出更棒的Java代碼示例。

    示例1: channelRead0

    ​點讚 3

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, O msg) throws Exception {

    UUID uuid = CHANNEL_UUID.get(ctx.channel());

    if (null != uuid) {

    NetSession session = SESSIONS.get(uuid);

    if (null != session) {

    RingBuffer ringBuffer = THREAD_LOCAL.get().getRingBuffer();

    long next = ringBuffer.next();

    try {

    ConcurrentEvent commandEvent = ringBuffer.get(next);

    commandEvent.setValues(newExecutor(session, msg));

    } finally {

    ringBuffer.publish(next);

    }

    }

    }

    }

    開發者ID:ogcs,項目名稱:Okra-Ax,代碼行數:18,

    示例2: channelRead0

    ​點讚 3

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, O msg) throws Exception {

    UUID uuid = CHANNEL_UUID.get(ctx.channel());

    if (null != uuid) {

    Session session = SESSIONS.get(uuid);

    if (null != session) {

    RingBuffer ringBuffer = THREAD_LOCAL.get().getRingBuffer();

    long next = ringBuffer.next();

    try {

    ConcurrentEvent commandEvent = ringBuffer.get(next);

    commandEvent.setValues(newExecutor(session, msg));

    } finally {

    ringBuffer.publish(next);

    }

    }

    }

    }

    開發者ID:ogcs,項目名稱:Okra,代碼行數:18,

    示例3: stampSequenceIdAndPublishToRingBuffer

    ​點讚 3

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,

    WALEdit edits, boolean inMemstore, RingBuffer ringBuffer)

    throws IOException {

    if (this.closed) {

    throw new IOException(

    "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());

    }

    MutableLong txidHolder = new MutableLong();

    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {

    txidHolder.setValue(ringBuffer.next());

    });

    long txid = txidHolder.longValue();

    try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {

    FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);

    entry.stampRegionSequenceId(we);

    ringBuffer.get(txid).load(entry);

    } finally {

    ringBuffer.publish(txid);

    }

    return txid;

    }

    開發者ID:apache,項目名稱:hbase,代碼行數:22,

    示例4: synchronousFire

    ​點讚 3

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    public void synchronousFire( final Object event ) {

    eventCount.increment();

    CountDownLatch latch = new CountDownLatch( handlers.size() );

    try {

    RingBuffer ringBuffer = disruptor.getRingBuffer();

    long sequence = ringBuffer.next();

    try {

    StatisticEventHolder holder = ringBuffer.get( sequence );

    holder.set( event );

    holder.setLatch( latch );

    } finally {

    ringBuffer.publish( sequence );

    }

    latch.await();

    } catch ( InterruptedException e ) {

    return;

    }

    }

    開發者ID:Tetha,項目名稱:bifroest,代碼行數:20,

    示例5: channelRead0

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, GpcCall msg) throws Exception {

    NetSession session = NET_SESSION_MAP.get(ctx.channel().id());

    if (null == session)

    return;

    RingBuffer ringBuffer = THREAD_LOCAL.get().getRingBuffer();

    long next = ringBuffer.next();

    try {

    ConcurrentEvent commandEvent = ringBuffer.get(next);

    commandEvent.setValues(newExecutor(session, msg));

    } finally {

    ringBuffer.publish(next);

    }

    }

    開發者ID:ogcs,項目名稱:Okra-Ax,代碼行數:15,

    示例6: channelRead0

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, O msg) throws Exception {

    NetSession session = SESSIONS.get(ctx.channel().id());

    if (null == session) {

    return;

    }

    RingBuffer ringBuffer = THREAD_LOCAL.get().getRingBuffer();

    long next = ringBuffer.next();

    try {

    ConcurrentEvent commandEvent = ringBuffer.get(next);

    commandEvent.setValues(newExecutor(session, msg));

    } finally {

    ringBuffer.publish(next);

    }

    }

    開發者ID:ogcs,項目名稱:Okra-Ax,代碼行數:16,

    示例7: runDisruptorPass

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    protected long runDisruptorPass() throws InterruptedException

    {

    final CountDownLatch latch = new CountDownLatch(1);

    long expectedCount = batchEventProcessor.getSequence().get() + ITERATIONS * BATCH_SIZE;

    handler.reset(latch, expectedCount);

    executor.submit(batchEventProcessor);

    long start = System.currentTimeMillis();

    final RingBuffer rb = ringBuffer;

    for (long i = 0; i < ITERATIONS; i++)

    {

    long hi = rb.next(BATCH_SIZE);

    long lo = hi - (BATCH_SIZE - 1);

    for (long l = lo; l <= hi; l++)

    {

    rb.get(l).setValue(i);

    }

    rb.publish(lo, hi);

    }

    latch.await();

    long opsPerSecond = (BATCH_SIZE * ITERATIONS * 1000L) / (System.currentTimeMillis() - start);

    waitForEventProcessorSequence(expectedCount);

    batchEventProcessor.halt();

    failIfNot(expectedResult, handler.getValue());

    return opsPerSecond;

    }

    開發者ID:winwill2012,項目名稱:disruptor-code-analysis,代碼行數:32,

    示例8: runDisruptorPass

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    protected long runDisruptorPass() throws InterruptedException

    {

    final CountDownLatch latch = new CountDownLatch(1);

    long expectedCount = poller.getSequence().get() + ITERATIONS;

    pollRunnable.reset(latch, expectedCount);

    executor.submit(pollRunnable);

    long start = System.currentTimeMillis();

    final RingBuffer rb = ringBuffer;

    for (long i = 0; i < ITERATIONS; i++)

    {

    long next = rb.next();

    rb.get(next).setValue(i);

    rb.publish(next);

    }

    latch.await();

    long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);

    waitForEventProcessorSequence(expectedCount);

    pollRunnable.halt();

    failIfNot(expectedResult, pollRunnable.getValue());

    return opsPerSecond;

    }

    開發者ID:winwill2012,項目名稱:disruptor-code-analysis,代碼行數:28,

    示例9: runDisruptorPass

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    protected long runDisruptorPass() throws InterruptedException

    {

    final CountDownLatch latch = new CountDownLatch(1);

    long expectedCount = batchEventProcessor.getSequence().get() + ITERATIONS;

    handler.reset(latch, expectedCount);

    executor.submit(batchEventProcessor);

    long start = System.currentTimeMillis();

    final RingBuffer rb = ringBuffer;

    for (long i = 0; i < ITERATIONS; i++)

    {

    long next = rb.next();

    rb.get(next).setValue(i);

    rb.publish(next);

    }

    latch.await();

    long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);

    waitForEventProcessorSequence(expectedCount);

    batchEventProcessor.halt();

    failIfNot(expectedResult, handler.getValue());

    return opsPerSecond;

    }

    開發者ID:winwill2012,項目名稱:disruptor-code-analysis,代碼行數:28,

    示例10: runDisruptorPass

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    protected long runDisruptorPass() throws InterruptedException

    {

    final CountDownLatch latch = new CountDownLatch(1);

    long expectedCount = batchEventProcessor.getSequence().get() + ITERATIONS;

    handler.reset(latch, ITERATIONS);

    executor.submit(batchEventProcessor);

    long start = System.currentTimeMillis();

    final RingBuffer rb = ringBuffer;

    for (long i = 0; i < ITERATIONS; i++)

    {

    long next = rb.next();

    long[] event = rb.get(next);

    for (int j = 0; j < event.length; j++)

    {

    event[j] = i;

    }

    rb.publish(next);

    }

    latch.await();

    long opsPerSecond = (ITERATIONS * ARRAY_SIZE * 1000L) / (System.currentTimeMillis() - start);

    waitForEventProcessorSequence(expectedCount);

    batchEventProcessor.halt();

    PerfTestUtil.failIf(0, handler.getValue());

    return opsPerSecond;

    }

    開發者ID:winwill2012,項目名稱:disruptor-code-analysis,代碼行數:32,

    示例11: runDisruptorPass

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    protected long runDisruptorPass() throws Exception

    {

    byte[] data = this.data;

    final CountDownLatch latch = new CountDownLatch(1);

    long expectedCount = processor.getSequence().get() + ITERATIONS;

    handler.reset(latch, ITERATIONS);

    executor.execute(processor);

    long start = System.currentTimeMillis();

    final RingBuffer rb = buffer;

    for (long i = 0; i < ITERATIONS; i++)

    {

    long next = rb.next();

    ByteBuffer event = rb.get(next);

    event.clear();

    event.put(data);

    event.flip();

    rb.publish(next);

    }

    latch.await();

    long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);

    waitForEventProcessorSequence(expectedCount);

    processor.halt();

    return opsPerSecond;

    }

    開發者ID:winwill2012,項目名稱:disruptor-code-analysis,代碼行數:31,

    示例12: publish

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    /**

    * Publish record task to record special table's log.

    *

    * @param struct The table struct.

    * @param list The log data list.

    */

    public void publish(Struct struct, List list) {

    RingBuffer rb = disruptor.getRingBuffer();

    long next = rb.next();

    try {

    LogRecordTask event = rb.get(next);

    event.setValues(struct, list);

    } finally {

    rb.publish(next);

    }

    }

    開發者ID:ogcs,項目名稱:Okra-LOG,代碼行數:17,

    示例13: onNext

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    public static void onNext(E value, RingBuffer> ringBuffer) {

    if (value == null) {

    throw SpecificationExceptions.spec_2_13_exception();

    }

    final long seqId = ringBuffer.next();

    final MutableSignal signal = ringBuffer.get(seqId);

    signal.type = MutableSignal.Type.NEXT;

    signal.value = value;

    ringBuffer.publish(seqId);

    }

    開發者ID:camunda,項目名稱:camunda-bpm-reactor,代碼行數:13,

    示例14: onError

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    public static void onError(Throwable error, RingBuffer> ringBuffer) {

    if (error == null) {

    throw SpecificationExceptions.spec_2_13_exception();

    }

    final long seqId = ringBuffer.next();

    final MutableSignal signal = ringBuffer.get(seqId);

    signal.type = MutableSignal.Type.ERROR;

    signal.value = null;

    signal.error = error;

    ringBuffer.publish(seqId);

    }

    開發者ID:camunda,項目名稱:camunda-bpm-reactor,代碼行數:15,

    示例15: onComplete

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    public static void onComplete(RingBuffer> ringBuffer) {

    final long seqId = ringBuffer.next();

    final MutableSignal signal = ringBuffer.get(seqId);

    signal.type = MutableSignal.Type.COMPLETE;

    signal.value = null;

    signal.error = null;

    ringBuffer.publish(seqId);

    }

    開發者ID:camunda,項目名稱:camunda-bpm-reactor,代碼行數:11,

    示例16: process

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    public void process() throws IOException {

    try (FastBufferedInputStream stream = new FastBufferedInputStream(inputStreamFactory.get())) {

    Preconditions.checkArgument(phaser.getRegisteredParties() == 0);

    phaser.register();

    disruptor.start();

    RingBuffer extends TwoPhaseEvent> ringBuffer = disruptor.getRingBuffer();

    long cursor = ringBuffer.next();

    LineBytesBuffer buffer = ringBuffer.get(cursor).input();

    long lineNo = 0;

    ringBuffer.get(cursor).lineNo(lineNo);

    boolean needToSkipNext = skipFirst;

    while (buffer.readLineFrom(stream)) {

    if (needToSkipNext) {

    needToSkipNext = false;

    continue;

    }

    lineNo++;

    ringBuffer.publish(cursor);

    cursor = ringBuffer.next();

    buffer = ringBuffer.get(cursor).input();

    ringBuffer.get(cursor).lineNo(lineNo);

    }

    disruptor.shutdown();

    phaser.arriveAndAwaitAdvance();

    phaser.arriveAndDeregister();

    } finally {

    afterDisruptorProcessed();

    }

    }

    開發者ID:scaled-ml,項目名稱:Scaled-ML,代碼行數:30,

    示例17: main

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    public static void main(String[] args) {

    final List handlers = new ArrayList<>(NUMBER_CONSUMERS);

    RingBuffer ringBuffer = RingBuffer.createSingleProducer(

    ValueEvent.EVENT_FACTORY, RING_SIZE, new SleepingWaitStrategy());

    start = System.nanoTime();

    //Create consumers

    for(int i = 0; i < NUMBER_CONSUMERS; i++) {

    ValueEventHandler1PMC handler = new ValueEventHandler1PMC(start, handlers);

    handlers.add(handler);

    SequenceBarrier barrier = ringBuffer.newBarrier();

    BatchEventProcessor eventProcessor = new BatchEventProcessor(

    ringBuffer,barrier, handler);

    ringBuffer.addGatingSequences(eventProcessor.getSequence());

    // Each EventProcessor can run on a separate thread

    EXECUTOR.submit(eventProcessor);

    }

    for(int i = 0; i < SAMPLES_SIZE; i++) {

    // Publishers claim events in sequence

    long sequence = ringBuffer.next();

    ValueEvent event = ringBuffer.get(sequence);

    event.setValue(i); // this could be more complex with multiple fields

    // make the event available to EventProcessors

    ringBuffer.publish(sequence);

    }

    }

    開發者ID:iproduct,項目名稱:low-latency-high-throughput,代碼行數:35,

    示例18: channelRead0

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, O msg) throws Exception {

    Session session = SESSIONS.get(ctx.channel().id());

    if (null == session) {

    return;

    }

    RingBuffer ringBuffer = THREAD_LOCAL.get().getRingBuffer();

    long next = ringBuffer.next();

    try {

    ConcurrentEvent commandEvent = ringBuffer.get(next);

    commandEvent.setValues(newExecutor(session, msg));

    } finally {

    ringBuffer.publish(next);

    }

    }

    開發者ID:ogcs,項目名稱:Okra,代碼行數:16,

    示例19: fire

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    public void fire( final Object event ) {

    eventCount.increment();

    RingBuffer ringBuffer = disruptor.getRingBuffer();

    long sequence = ringBuffer.next();

    try {

    StatisticEventHolder holder = ringBuffer.get( sequence );

    holder.set( event );

    holder.setLatch( null );

    } finally {

    ringBuffer.publish( sequence );

    }

    }

    開發者ID:Tetha,項目名稱:bifroest,代碼行數:14,

    示例20: run

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    @Override

    public void run(){

    _logger.info("Node run starting...");

    Executor executor = Executors.newCachedThreadPool();

    Disruptor disruptor = new Disruptor<>(SurfEvent.EVENT_FACTORY, 128, executor);

    disruptor.handleEventsWith(this);

    RingBuffer buffer = disruptor.start();

    VDSEventListImpl events = null;

    while(!_shutdown){

    try{

    events = _eventListPool.borrowObject(0);

    while(events.getEventsList().isEmpty()) {

    _source.read(events);

    }

    if(_needsAck){

    Object obj = _acksource.getInputObject();

    // Hacking this in for now

    // Need to actually ack the object after Kinesis consumes it: TODO

    _acksource.updateInputObject(obj);

    }

    long seq = buffer.next();

    SurfEvent event = buffer.get(seq);

    event.setEventlist(events);

    buffer.publish(seq);

    }

    catch(Exception ex){

    _logger.error("Exception while reading data:", ex);

    ex.printStackTrace();

    }

    }

    }

    開發者ID:InformaticaCorp,項目名稱:Surf,代碼行數:34,

    示例21: publishExchangeOnRingBuffer

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    private void publishExchangeOnRingBuffer(final Exchange exchange,

    final RingBuffer ringBuffer) {

    final long sequence = ringBuffer.next();

    ringBuffer.get(sequence).setExchange(exchange, uniqueConsumerCount);

    ringBuffer.publish(sequence);

    }

    開發者ID:HydAu,項目名稱:Camel,代碼行數:7,

    示例22: tryPublishExchangeOnRingBuffer

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    private void tryPublishExchangeOnRingBuffer(final Exchange exchange, final RingBuffer ringBuffer) throws InsufficientCapacityException {

    final long sequence = ringBuffer.tryNext();

    ringBuffer.get(sequence).setExchange(exchange, uniqueConsumerCount);

    ringBuffer.publish(sequence);

    }

    開發者ID:HydAu,項目名稱:Camel,代碼行數:6,

    示例23: main

    ​點讚 2

    import com.lmax.disruptor.RingBuffer; //導入方法依賴的package包/類

    public static void main(String[] args) {

    RingBuffer ringBuffer = RingBuffer.createSingleProducer(

    ValueEvent.EVENT_FACTORY, RING_SIZE, new SleepingWaitStrategy());

    start = System.nanoTime();

    //Create first consumer

    ValueEventHandler1PMCSequenceFirst handler = new ValueEventHandler1PMCSequenceFirst(start);

    SequenceBarrier barrier;

    barrier = ringBuffer.newBarrier();

    BatchEventProcessor firstEventProcessor = new BatchEventProcessor(

    ringBuffer,barrier, handler);

    //ringBuffer.addGatingSequences(firstEventProcessor.getSequence());

    // Each EventProcessor can run on a separate thread

    EXECUTOR.submit(firstEventProcessor);

    //Create second consumer

    ValueEventHandler1PMCSequenceSecond handler2 = new ValueEventHandler1PMCSequenceSecond(start);

    SequenceBarrier barrier2 = ringBuffer.newBarrier(firstEventProcessor.getSequence());

    BatchEventProcessor secondEventProcessor = new BatchEventProcessor(

    ringBuffer,barrier2, handler2);

    ringBuffer.addGatingSequences(secondEventProcessor.getSequence());

    // Each EventProcessor can run on a separate thread

    EXECUTOR.submit(secondEventProcessor);

    for(int i = 0; i < SAMPLES_SIZE; i++) {

    // Publishers claim events in sequence

    long sequence = ringBuffer.next();

    ValueEvent event = ringBuffer.get(sequence);

    event.setValue(i); // this could be more complex with multiple fields

    // make the event available to EventProcessors

    ringBuffer.publish(sequence);

    }

    }

    開發者ID:iproduct,項目名稱:low-latency-high-throughput,代碼行數:42,

    注:本文中的com.lmax.disruptor.RingBuffer.publish方法示例整理自Github/MSDocs等源碼及文檔管理平台,相關代碼片段篩選自各路編程大神貢獻的開源項目,源碼版權歸原作者所有,傳播和使用請參考對應項目的License;未經允許,請勿轉載。

    展开全文
    weixin_29195137 2021-02-26 19:04:28
  • 4KB qq_35620754 2020-06-12 17:23:27
  • //写入次数缓冲区大小,每次的实际内容大小不固定 string[] RingBuffer = new string[C_BUFFER_SIZE]; int writedTimes = 0; 变量writedTimes 记录写入次数,它会一直递增,不过为了线程安全的递增且不使用托管锁,...

    最近常收到SOD框架的朋友报告的SOD的SQL日志功能报错:文件句柄丢失。经过分析得知,这些朋友使用SOD框架开发了访问量比较大的系统,由于忘记关闭SQL日志功能所以出现了很高频率的日志写入操作,从而偶然引起错误。后来我建议只记录出错的或者执行时间较长的SQL信息,暂时解决了此问题。但是作为一个热心造轮子的人,一定要看看能不能造一个更好的轮子出来。

    前面说的错误原因已经很直白了,就是频繁的日志写入导致的,那么解决方案就是将多次写入操作合并成一次写入操作,并且采用异步写入方式。要保存多次操作的内容就要有一个类似“队列”的东西来保存,而一般的线程安全的队列,都是“有锁队列”,在性能要求很高的系统中,不希望在日志记录这个地方耗费多一点计算资源,所以最好有一个“无锁队列”,因此最佳方案就是Ring Buffer(环形缓冲区)了。

    什么是Ring Buffer?顾名思义,就是一个内存环,每一次读写操作都循环利用这个内存环,从而避免频繁分配和回收内存,减轻GC压力,同时由于Ring Buffer可以实现为无锁的队列,从而整体上大幅提高系统性能。Ring Buffer的示意图如下,有关具体原理,请参考此文《Ring Buffer 有什么特别?》。

    689b5ec31a15e1da15a1e11d2421c827.png

    上文并没有详细说明如何具体读写Ring Buffer,但是原理介绍已经足够我们怎么写一个Ring Buffer程序了,接下来看看我在 .NET上的实现。

    首先,定一个存放数据的数组,记住一定要用数组,它是实现Ring Buffer的关键并且CPU友好。

    const int C_BUFFER_SIZE = 10;//写入次数缓冲区大小,每次的实际内容大小不固定

    string[] RingBuffer = new string[C_BUFFER_SIZE];

    int writedTimes = 0;

    变量writedTimes 记录写入次数,它会一直递增,不过为了线程安全的递增且不使用托管锁,需要使用原子锁Interlocked。之后,根据每次 writedTimes 跟环形缓冲区的大小求余数,得到当前要写入的数组位置:

    void SaveFile(string fileName, stringtext)

    {int currP= Interlocked.Increment(refwritedTimes);int writeP= currP %C_BUFFER_SIZE ;int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1;

    RingBuffer[index]= "Arr[" + index + "]:" +text;

    }

    Ring Buffer的核心代码就这么点,调用此方法,会一直往缓冲区写入数据而不会“溢出”,所以写入Ring Buffer效率很高。

    一个队列如果只生产不消费肯定不行的,那么如何及时消费Ring Buffer的数据呢?简单的方案就是当Ring Buffer“写满”的时候一次性将数据“消费”掉。注意这里的“写满”仅仅是指写入位置 index达到了数组最大索引位置,而“消费”也不同于常见的堆栈,队列等数据结构,只是读取缓冲区的数据而不会移除它。

    所以前面的代码只需要稍加改造:

    void SaveFile(string fileName, stringtext)

    {int currP= Interlocked.Increment(refwritedTimes);int writeP= currP %C_BUFFER_SIZE ;int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1;

    RingBuffer[index]= "Arr[" + index + "]:" +text;if (writeP == 0)

    {string result = string.Concat( RingBuffer);

    FlushFile(fileName, result);

    }

    }

    writeP == 0 表示当前一轮的缓冲区已经写满,然后调用函数 FlushFile 将Ring Buffer的数据连接起来,整体写入文件。

    void FlushFile(string fileName, stringtext)

    {using (FileStream fs = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.Write, 2048, FileOptions.Asynchronous))

    {byte[] buffer =System.Text.Encoding.UTF8.GetBytes(text);

    IAsyncResult writeResult= fs.BeginWrite(buffer, 0, buffer.Length,

    (asyncResult)=>{

    fs.EndWrite(asyncResult);

    },

    fs);//fs.EndWrite(writeResult);//这种方法异步起不到效果

    fs.Flush();

    }

    }

    在函数 FlushFile 中我们使用了异步写入文件的技术,注意 FileOptions.Asynchronous ,使用它才可以真正利用Windows的完成端口IOCP,将文件异步写入。

    当然这段代码也可以使用.NET最新版本支持的 async/await ,不过我要让SOD框架继续支持.NET 2.0,所以只好这样写了。

    现在,我们可以开多线程来测试这个循环队列效果怎么样:

    Task[] arrTask = new Task[20];for (int i = 0; i < arrTask.Length; i++)

    {

    arrTask[i]= new Task(obj => SaveFile( (int)obj) ,i);

    }for (int i = 0; i < arrTask.Length; i++)

    {

    arrTask[i].Start();

    }

    Task.WaitAll(arrTask);

    MessageBox.Show(arrTask.Length+"Task All OK.");

    这里开启20个Task任务线程来写入文件,运行此程序,发现20个线程才写入了10条数据,分析很久才发现,文件异步IO太快的话,会有缓冲区丢失,第一次写入的10条数据无法写入文件,多运行几次就没有问题了。所以还是得想法解决此问题。

    通常情况下我们都是使用托管锁来解决这种并发问题,但本文的目的就是要实现一个“无锁环形缓冲区”,不能在此“功亏一篑”,所以此时“信号量”上场了。

    同步可以分为锁定和信号同步,信号同步机制中涉及的类型都继承自抽象类WaitHandle,这些类型有EventWaitHandle(类型化为AutoResetEvent、ManualResetEvent)、Semaphore以及Mutex。见下图:

    b90ce8233de6f0733d8cb0335bd187bd.png

    首先声明一个 ManualResetEvent对象:

    ManualResetEvent ChangeEvent = new ManualResetEvent(true);

    这里我们将 ManualResetEvent 对象设置成 “终止状态”,意味着程序一开始是允许所有线程不等待的,当我们需要消费Ring Buffer的时候再将  ManualResetEvent 设置成“非终止状态”,阻塞其它线程。简单说就是当要写文件的时候将环形缓冲区阻塞,直到文件写完才允许继续写入环形缓冲区。

    对应的新的代码调整如下:

    void SaveFile(string fileName, stringtext)

    {

    ChangeEvent.WaitOne();int currP= Interlocked.Increment(refwritedTimes);int writeP= currP %C_BUFFER_SIZE ;int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1;

    RingBuffer[index]= "Arr[" + index + "]:" +text;if (writeP == 0)

    {

    ChangeEvent.Reset();string result = string.Concat( RingBuffer);

    FlushFile(fileName, result);

    }

    }

    然后,再FlushFile 方法的 回掉方法中,加入设置终止状态的代码,部分代码如下:

    (asyncResult) =>{

    fs.EndWrite(asyncResult);

    ChangeEvent.Set();

    }

    OK,现在我们的程序具备高性能的安全的写入日志文件的功能了,我们来看看演示程序测试的日志结果实例:

    Arr[0]:Thread index:0--FFFFFFF

    Arr[1]:Thread index:1--FFFFFFF

    Arr[2]:Thread index:8--FFFFFFF

    Arr[3]:Thread index:9--FFFFFFF

    Arr[4]:Thread index:3--FFFFFFF

    Arr[5]:Thread index:2--FFFFFFF

    Arr[6]:Thread index:4--FFFFFFF

    Arr[7]:Thread index:10--FFFFFFF

    Arr[8]:Thread index:5--FFFFFFF

    Arr[9]:Thread index:6--FFFFFFF

    Arr[0]:Thread index:7--FFFFFFF

    Arr[1]:Thread index:11--FFFFFFF

    Arr[2]:Thread index:12--FFFFFFF

    Arr[3]:Thread index:13--FFFFFFF

    Arr[4]:Thread index:14--FFFFFFF

    Arr[5]:Thread index:15--FFFFFFF

    Arr[6]:Thread index:16--FFFFFFF

    Arr[7]:Thread index:17--FFFFFFF

    Arr[8]:Thread index:18--FFFFFFF

    Arr[9]:Thread index:19--FFFFFFF

    测试结果符合预期!

    到此,我们今天的主题就全部介绍完成了,不过要让本文的代码能够符合实际的运行,还要解决每次只写入少量数据并且将它定期写入日志文件的问题,这里贴出真正的局部代码:

    f9a069730c2849bd7687110b1f55617c.png

    PS:有朋友说采用信号量并不能完全保证程序安全,查阅了MSDN也说如果信号量状态改变还没有来得及应用,那么是起不到作用的,所以还需要检查业务状态标记,也就是在设置非终止状态后,马上设置一个操作标记,在其它线程中,需要检查此标记,以避免“漏网之鱼”引起不期望的结果。

    再具体实现上,我们可以实现一个“自旋锁”,循环检查此状态标记,为了防止发生死锁,还需要有锁超时机制,代码如下:

    void SaveFile(string fileName, stringtext)

    {

    ChangeEvent.WaitOne(10000);int currP= Interlocked.Increment(refWritedTimes);int writeP= currP %C_BUFFER_SIZE ;int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1;if (writeP == 0)

    {

    ChangeEvent.Reset();

    IsReading= true;

    RingBuffer[index]= "Arr[" + index + "]:" +text;

    LastWriteTime=DateTime.Now;

    WritingIndex= 0;

    SaveFile(fileName,RingBuffer);

    }else if (DateTime.Now.Subtract(LastWriteTime).TotalSeconds >C_WRITE_TIMESPAN)

    {

    ChangeEvent.Reset();

    IsReading= true;

    RingBuffer[index]= "Arr[" + index + "]:" +text;int length = index - WritingIndex + 1;if (length <= 0)

    length= 1;string[] newArr = new string[length];

    Array.Copy(RingBuffer, WritingIndex, newArr,0, length);

    LastWriteTime=DateTime.Now;

    WritingIndex= index + 1;

    SaveFile(fileName, newArr);

    }else{//防止漏网之鱼的线程在信号量产生作用之前修改数据//采用“自旋锁”等待

    int count = 0;while(IsReading)

    {if (count++ > 10000000)

    {

    Thread.Sleep(50);break;

    }

    }

    RingBuffer[index]= "Arr[" + index + "]:" +text;

    }

    }

    完整的Ring Buffer代码会在最新版本的SOD框架源码中,有关本篇文章测试程序的完整源码,请加QQ群讨论获取,

    群号码:SOD框架高级群 18215717 ,加群请注明 PDF.NET技术交流 ,否则可能被拒绝。

    展开全文
    weixin_39725403 2021-02-26 19:05:11
  • 249KB mfc_mfc 2018-12-17 11:21:35
  • 4星
    7KB qq_20636241 2015-09-01 12:49:27
  • weixin_29592699 2021-05-16 18:56:46
  • 16KB weixin_38744153 2019-09-18 11:16:23
  • justsomebody126 2020-01-10 11:01:19
  • newbmiao 2021-09-02 01:07:05
  • weixin_39728909 2021-01-26 18:09:40
  • u014029783 2020-07-27 22:13:11
  • tugouxp 2021-06-14 15:11:53
  • luckydarcy 2020-07-13 00:42:24
  • xueyushenzhou 2021-07-24 19:34:59
  • 1KB weixin_42119866 2021-03-25 16:06:41
  • qq_31443653 2018-10-29 09:57:05
  • 9KB weixin_42117267 2021-06-14 23:01:37
  • weixin_35997503 2021-05-17 18:51:45
  • wteruiycbqqvwt 2019-09-09 16:29:14
  • adgentleman 2019-04-12 16:51:32
  • qq_43277087 2020-01-20 09:11:20

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 21,138
精华内容 8,455
关键字:

ringbuffer

友情链接: telnet.zip