Disruptor.Net - IEventTranslator

IEventTranslator

IEventTranslator是在第3版發布的新功能,這篇文章是介紹使用的方式及設計理念

生產者發送事件到RingBuffer

事件驅動的一個特性就是透過發送事件給關注的訂閱者,進而讓多個不同的業務邏輯得以實現,以Disruptor的設計,就是要透過RingBuffer.Publish()這個方法把事件放入RingBuffer,後面的EventHandler則會依據其掛載的順序,由SequenceBarrier調度執行。

在較早期的版本與範例程式碼,發送Event都是先呼叫RingBuffer.Next()取得最新可用的sequence,然後依sequence取得指定的slot中的物件,設定完要傳遞的屬性後,要再呼叫RingBuffer.Publish()方法把事件發送出去,大致如以下的程式碼。

var sequence = _ringBuffer.Next();
_ringBuffer[sequence].Value = i;
_ringBuffer.Publish(sequence);

但這段程式碼有個問題,假如先取得了sequence,但在Publish之前發生Exception會發生什麼事?

停滯的RingBuffer

RingBuffer因其特殊的設計,在使用上比傳統的Queue來的複雜,這段程式碼如果不加上try catch/finally,確保最終要把這個sequence發布到RingBuffer,將造成整個RingBuffer停滯不前,必須重啟才能恢復,這在MultiProducer特別容易發生,對一個In-Memory Transaction系統來說,是個非常致命的問題。

所以這段程式碼必須要改成這樣,因而增加了程式碼的複雜度

var sequence = 0L;
try
{
    sequence = _ringBuffer.Next();
    _ringBuffer[sequence].Value = i;
}
finally
{
    _ringBuffer.Publish(sequence);
}

因此在第3版的Disruptor,LMAX的團隊為簡化這些使用上的複雜性,有了新的RingBuffer.PublishEvent()及對應的IEventTranslator系列介面。

PublishEvent()已內建了try catch/finally,只要實作IEventTranslator介面,就可以不再需要自己處理這段程式碼,發送事件只要一行程式碼就可以搞定了。

_ringBuffer.PublishEvent(Translator.Instance, message.EventType, message.EventData);

程式碼更簡潔,此外也帶來其他額外的好處,EventTranslator很容易撰寫單元測試,也可以依據需求撰寫各種不同的EventTranslator進行抽換。

從設計的角度來看EventTranslator

DDD有一個很核心的概念叫Bounded Context,意思是整個Domain可以被劃分出不同的Domain,各有各自的邊界,同樣的事件在不同的Context可能被賦與不同的意義,而不同的Context會有上下游的協作方式,中間可能透過各種方式協作。

而以事件驅動的設計理念,上下游的Context透過事件進行協作,此時單純的以.Publish()發送事件就有可能發生一個問題,不同的Context定義的相同事件內容並不完全一致,且隨著需求演變,事件攜帶的資訊可能也需要擴展,有時為了一些相容性(已儲存的事件結構與新的不同),就會有一些做法容易造成程式碼腐化,而EventTranslator的出現,很好的解決了這個問題。

使用方式

IEventTranslator有多個多載,可以依需求決定實作哪一個介面

IEventTranslatorOneArg<T, A>
IEventTranslatorTwoArg<T, A, B>
IEventTranslatorThreeArg<T, A, B, C>
IEventTranslatorVararg<T>

使用的方法都一樣,最後我附上一個實作IEventTranslatorTwoArg的Producer程式碼,應該很容易理解如何使用。

public class Producer
{
    private readonly RingBuffer<EventMessage> _ringBuffer;

    public Producer(RingBuffer<EventMessage> ringBuffer)
    {
        _ringBuffer = ringBuffer;
    }

    public void OnData(EventMessage message)
    {
        _ringBuffer.PublishEvent(Translator.Instance, message.EventType, message.EventData);
    }

    private class Translator : IEventTranslatorTwoArg<EventMessage, EventType, PayloadInfo>
    {
        public static readonly Translator Instance = new Translator();

        private Translator(){}

        public void TranslateTo(EventMessage @event, long sequence, EventType eventType, PayloadInfo payload)
        {
            @event.EventType = eventType;
            if (eventType == EventType.OrderPlaced)
            {
                @event.EventData.Order = payload.Order;
            }
        }
    }
}
上篇多執行緒程式的效能隱形殺手 - False sharing
下篇Redis系列 - C#存取Redis (下)