Subject/Observer is dual to Iterator

Subject/Observer is dual to Iterator (Erik “Head in the Box” Meijer, Microsoft Corporation)

The concept of duality is a very powerful trick that provides “buy one, get one free” in mathematics and engineering. For example, the well-known law of De Morgan, exploits the duality between conjunction && and disjunction || to prove that negation ! distributes over both conjunction and disjunction:

!(a  && b) == !a || !b

!(a || b)  == !a && !b

Other examples in computer science are the duality between call-by-value and call-by-name, between induction and co-induction, between least and greatest fixed points, and between algebras and co-algebras.

We discovered that the archetypal enumerable collections that expose IEnumerable<T> and IEnumerator<T> are dual to observable collections that expose IObservable<T> and IObserver<T>. The former embody the protocol for interactive computations where the consumer synchronously pulls data from the producer, while the latter embody the protocol for reactive computations where the producer asynchronously pushes data to the consumer. As far as we are aware the fact that the Observer and Iterator design patterns are mathematical duals has not been observed before; the standard design pattern literature does not even list Iterator and Observer as related patterns.

To derive observable reactive collections using duality, we start with the well-known Iterator design pattern, as embodied in the .NET framework via the pair of IEnumerable<out T> and IEnumerator<out T> interfaces (Java has a very similar pair of interfaces Iterable<T> and Iterator<T>):

interface IEnumerable<out T>

{

           IEnumerator<T> GetEnumerator();

}

        interface IEnumerator<out T>: IDisposable

        {

           bool MoveNext();   // throws Exception

           T Current { get; }

        }

We ignore the Reset method and the legacy non-generic base interfaces IEnumerable and IEnumerator. Note that the MoveNext property can throw an exception (in the equivalent Java Iterator interface this fact is apparent in the throws clause, strangely enough on the next() method), besides returning a proper value.

Taking the definition of categorical duality from Wikipedia literally, we completely mechanically swap the arguments and results of all method signatures. The only small twist is that we dualize just the IEnumerator interface, but leave the IDisposable aspect of IEnumerator that is returned from GetEnumerator invariant. Doing this, we arrive at the following two interfaces:

interface IObservable<out T>

{

  IDisposable Subscribe(IObserver<T> observer);

}

interface IObserver<in T>

{

  void OnCompleted(bool done);

  void OnError(Exception exception);

  T OnNext { set; }

}

We streamline these unthinkingly derived interfaces by observing that the (implicit) protocol for IEnumerator is that the environment signals that there will be no more values by returning false from MoveNext, so dualizing that operational behavior, the boolean argument of the OnCompleted method is redundant and by just calling OnCompleted (without any arguments) the environment signals that there will be no more values. Instead of a write-only property, we turn OnNext into a method.

Surprisingly, after this refactoring, we obtain a pair of interfaces for the familiar Observer design pattern:

interface IObservable<in T>

{

  IDisposable Subscribe(IObserver<T> observer) ;

}

interface IObserver<in T>

{

  void OnCompleted();

  void OnError(Exception exception);

  void OnNext(T value);

}

Interestingly, the interfaces that we derived guided by mathematics are even better than the typical observer/observable types. Operationally, for enumerable collections, we undo the effect of calling GetEnumerator() by disposing the returned enumerator. For observable collections, we undo the effect of Subscribe-ing an observer by disposing the handle that is returned from the subscription. This seemingly simple technique is essential to allow composition of operations on observable collections that is hard to achieve using Add/Remove mechanism of .NET delegates, or the usual void deleteObserver(Observer o) method for removing observers from an observable.

For example, consider filter combinator IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predicate), which returns a new observable (using the factory method Observable.Create) that when you subscribe to it, will subscribe to the source with an anonymous observer (using the factory method Observer.Create) that will filter out all values for which the predicate does not hold:

                    IObservable<T> Where(this IObserver<T> source, Func<T, bool> predicate)

        {

           return Observable.Create<T>(observer =>

           {

              return source.Subscribe(Observer.Create<T>(value =>
             {

         try { if(predicate(value)) observer.OnNext(value); }

         catch (Exception e) { observer.OnError(e); }
             });

           };

        }

Thanks to the fact that subscribing to an observable returns a disposable object to unsubscribe, the above implementation is completely stateless, there is no need for the inner observer keep track of its observers so that they can be removed. That responsibility is simply delegated to the source observable, which ultimately bottoms out into a stateful object such as a .NET event source that maintains a list of observers:

IObservable<IEvent<MouseEventArgs>> GetMouseMoves(this Control control)
{

           return Observable.Create<IEvent<MouseEventArgs>>(observer =>

           {

      MouseEventHandler handler = (s,e) => observer.OnNext(Event.Create(s,e));

      control.MouseMove += handler;

      return () => { control.MouseMove -= handler; };

   });
}

Based on the IObserver<T> and IObservable<T> interfaces we derived above, we have built a monadic LINQ-based framework for reactive programming called Rx. It is interesting to compare Rx with the traditional approach to reactive programming in the functional language community. As the Wikipedia entry mentions “the semantic model of FRP […] is typically in terms of continuous functions […] over time”. Even discrete events are typically associated with time, and as a result of that choice (we believe) efficient implementations of FRP are still an open research problem

In contrast, Rx completely sheds the notion of time from the notion of reactivity, and instead of focusing on time Rx is parameterized over concurrency. Not only are pull-based enumerable streams and push-based observable streams each other’s dual, they are in fact isomorphic. When converting a pull-based stream into a push-based stream we need to add concurrency to prevent the Subscribe method from blocking while the values from the source stream are pushed on the target stream, while dually, when converting a push-based stream into a pull-based stream we need to remove concurrency to block the MoveNext call until the source pushed the next value. We use a similar abstraction for representing execution contexts as Java’s ScheduledExecutorService interface.

interface IScheduler

{
  
 DateTimeOffset Now { get; }                             // local absolute time

    IDisposable Schedule(Action work)                // work will execute eventually.

    IDisposable Schedule(Action work, TimeSpan dueTime)     // work will execute after dueTime.
}

It is still an open question where and how the math we used to derive IObservable<T> from IEnumerable<T> dictates the essential role of concurrency. Also, we are hoping for a similar strike of lightening for IScheduler<T> as we experienced when we discovered IObservable<T>.

Join Patterns using Rx

One of the most difficult tasks using standard .NET events is defining complex synchronization patterns between multiple event sources where new composite event are created as a result of the occurrence of certain combinations of other events.

The Joins class in Rx provides a basic implementation of the Join calculus over observable collections, which allows for concise descriptions of complex patterns that match notifications on multiple inputs. For instance, let’s looks that the JoCaml example of a concurrent counter whose value can be increment or read by multiple threads. The example uses a common technique of employing a private channel that contains the exclusive state of the counter, plus two bidirectional channels that represent that Inc and Get methods on the counter.

We model unidirectional channels with values of type T using an ISubject<T> whose IObservable<out T> part represents the output port of the channel and the IObserver<in T> part represents the input port of the channel. In this case, the channel contains an integer value, so we declare _counter as follows:

readonly ISubject<int> _counter = new Subject<int>();


We model bidirectional or duplex channels with values of type T using a subject that contains a subject whose “continuation port” of type IObserver<T> is used to send back the return messages, and whose IObservable<T> part is used to receive the returned messaged.

readonly ISubject<ISubject<Unit>> _inc = new Subject<ISubject<Unit>>();

readonly ISubject<ISubject<int>>  _get = new Subject<ISubject<int>>();


We could tighten up the typing by declaring separate variables for input and output ports, for instance, the static type of the output port of the _get channel only needs to be exposed as an input port carrying an output port IObservable<IObserver<int>>, but insisting on the most strict typing would only hide the inherent symmetry between unidirectional and bidirectional channels.

The three channels are wired up using a join pattern to guarantee that the counter is exclusively used to read the current value, or to increment the current value.

·        In the first case the join pattern _counter.And(_get) becomes active when both the _counter and the _get channels contain a message,  in which case the unchanged value n of the counter is send back to the _counter channel via _counter.OnNext(n) and the current value n is also sent to the input port result.OnNext(n) carried by _get.

·        In the second case,  the join pattern _counter.And(_inc) becomes active when both the _counter and the _inc channels contain a message, in which case the value of the counter n is incremented and sent back to the _counter channel via _counter.OnNext(n + 1), and the result channel of _inc is notified using result.OnNext().

Observable.Join(

   _counter.And(_get).Then((n, result) =>

   {

      _counter.OnNext(n);

      result.OnNext(n);

   }),

   _counter.And(_inc).Then((n, result) =>

   {

      _counter.OnNext(n + 1);

      result.OnNext();

   }))

The join pattern above is created and started in the constructor of the counter, which also hooks up the disposable to the IDisposable implementation of the counter itself, and sends the initial value of the counter on the _counter channel (see complete code below, which also contains a few simple helper functions to make the use of the Unit type a little more palatable).

The Inc and Get methods of the counter class are now rather straightforward. Both create a return channel using an AsyncSubject, notify their respective channels of the state machine with that channel and synchronously wait for the result to come back using First().

public void Inc()

     {

        var result = new AsyncSubject<Unit>();

        _inc.OnNext(result);

        result.First();

     }

     public int Get()

     {

        var result = new AsyncSubject<int>();

        _get.OnNext(result);

        return result.First();

     }

The complete implementation of a concurrent counter using Rx join patterns then looks like this.

using System;

using System.Collections.Generic;

using System.Linq;

namespace Counter

{

    class Counter: IDisposable

    {

        readonly ISubject<ISubject<Unit>> _inc = new Subject<ISubject<Unit>>();

        readonly ISubject<ISubject<int>> _get = new Subject<ISubject<int>>();

        readonly ISubject<int> _counter = new Subject<int>();

        readonly IDisposable _dispose;

        public void Inc()

        {

            var result = new AsyncSubject<Unit>();

            _inc.OnNext(result);

            result.First();

        }

        public int Get()

        {

            var result = new AsyncSubject<int>();

            _get.OnNext(result);

            return result.First();

        }

        public Counter(int init)

        {

            _dispose = Observable.Join(

                        _counter.And(_get).Then((n, result) =>

                        {

                            _counter.OnNext(n);

                            result.OnNext(n);

                        }),

                        _counter.And(_inc).Then((n, result) =>

                        {

                            _counter.OnNext(n + 1);

                            result.OnNext();

                        })).Subscribe();

            _counter.OnNext(init);

        }

        public void Dispose()

        {

            _dispose.Dispose();

        }

    }

    public static class Helpers

    {

        public static void OnNext(this IObserver<Unit> src)

        {

            src.OnNext(new Unit());

        }

        public static Plan<Unit> Then<S, T>(this Pattern<S, T> p, Action<S, T> selector)

        {

            return p.Then((first, second) =>

            {

                selector(first, second);

                return new Unit();

            });

        }

    }

}

Rx Sample

Several people have asked for the source of the sample on Soma’s blog about Rx. The sample explains how to write a prototypical  Silverlight-based AJAX application, the killer scenario for Rx, where you have to mash up events from the UI with asynchronous calls to several Web services. Specifically, the sample makes three concurrent calls to the Bing translation service and shows the first two results that come back.

To run the sample, just create a new Silverlight project. No need to add a Web site for it. Just the basic project with an automatically generated test page is sufficient.

Since I am a complete kludge when it comes to UI design, I do not even try to make it look fancy. Just a TextBox for the user input and three TextBlocks to show the results of the translations of the input into various languages. Here is the contents of my MainPage.xaml:

<UserControl x:Class="SomaBlogRx.MainPage"

    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"

    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"

    xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
    xmlns
:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"

    mc:Ignorable="d" d:DesignWidth="640" d:DesignHeight="480">

  <Grid x:Name="LayoutRoot">

        <StackPanel>

            <TextBox Name="_input" Width="600" Height="20" Background="Aquamarine"></TextBox>

            <TextBlock Text="Dutch"></TextBlock>

            <TextBlock Name="_dutch" Text="..."></TextBlock>

            <TextBlock Text="French"></TextBlock>

            <TextBlock Name="_french" Text="..."></TextBlock>

            <TextBlock Text="Spanish"></TextBlock>

            <TextBlock Name="_spanish" Text="..."></TextBlock>

        </StackPanel>

    </Grid>

</UserControl>

Any suggestions for more a more appealing UI design are very welcome.

The first step is to add a service reference to the Bing translation service. You will have to sign up for a Bing API key at  http://msdn.microsoft.com/en-us/library/dd251056.aspx. When you have your key, add a service reference to  http://api.bing.net/search.wsdl?AppID=[insert your own key here]&Version=2.2 using BingService for the namespace. In the advanced settings choose System.Array for the collection type and unselect “Reuse types in reference assemblies”. If this all works, you will have a BingService entry in your Service References folder in your project:

Image001

Once we have the service reference to Bing, we will expose the raw generated service as an async subject  AsyncSubject<TranslationResponse> that we hook up to the SearchCompleted event of the SearchAsync call. Do not forget to insert your own Bing API key in the code below.

using System;

using System.Collections.Generic;

using System.Linq;

using SomaBlogRx.BingService;

namespace SomaBlogRx

{

    /// <summary>

    /// Observable wrapper over generated Bing service reference.

    /// </summary>

    public static class Bing

    {

        /// <summary>

        /// Pick first translation from response.

        /// </summary>

        public static string GetTranslation(this TranslationResponse response)

        {

            var translation = "";

            if (response != null) translation = response.Results[0].TranslatedTerm;

            return translation;

        }

        const string AppId = [insert your own key here];

        /// <summary>

        /// Strongly typed null pointer.

        /// </summary>

        public readonly static TranslationResponse NoResult;

        /// <summary>

        /// Asynchronously return the translated text for the given text supplied,

        /// matching the destination language.

        /// </summary>

        /// <param name="text">The text that is to be translated.</param>

        /// <param name="sourceLanguage">The source language as a language code. </param>

        /// <param name="destinationLanguage">The destination language as a language code.</param>

        public static IObservable<TranslationResponse> Translate

                 (this string text, string sourceLanguage, string destinationLanguage)

        {

            var subject = new AsyncSubject<TranslationResponse>();

            var service = new BingPortTypeClient();

            var request = new SearchRequest

            {

                AppId = AppId,

                Translation = new TranslationRequest

                {

                    SourceLanguage = sourceLanguage,

                    TargetLanguage = destinationLanguage,

                },

                Query = text,

                Sources = new[] { SourceType.Translation },

            };

            service.SearchCompleted += (sender, e) =>

            {

                if (e.Cancelled) subject.OnCompleted();

                else if (e.Error != null) subject.OnError(e.Error);

                else subject.OnNext(e.Result.Translation);

            };

            service.SearchAsync(request);

            return subject.Hide().ObserveOnDispatcher(); // move back to dispatcher thread.

        }

    }

}

Before we return from the wrapper, we hide the IObserver aspect of the subject and make sure that we transition back to the Dispatcher thread to avoid the dreaded “not on the UI thread” exceptions.

All the real orchestration is done inside MainPage.xaml.cs. First we expose the KeyUp events of the TextBox _input as an observable collection of type  IObservable<IEvent<KeyEventArgs>> using the IObservable.FromEvent helper, making sure that the subscription happens on the correct thread.

using System;

using System.Collections.Generic;

using System.Linq;

using System.Windows.Controls;

using System.Windows.Input;

namespace SomaBlogRx

{

    public partial class MainPage : UserControl

    {

        /// <summary>

        /// Return observable collection of KeyUp events on _input

        /// making sure that subscription happens on the dispatcher thread.

        /// </summary>

        IObservable<IEvent<KeyEventArgs>> Input()

        {

            return Observable.FromEvent<KeyEventArgs>(_input, "KeyUp").SubscribeOnDispatcher();

        }

Since we do not want to invoke a search upon each keystroke, we throttle the input stream such that it only fires an event when the user has paused typing for ½ second. The simple LINQ query transforms the observable collection of KeyUp events into an observable collection of strings containing the Text of the input after each KeyUp event:

        public MainPage()

        {

            InitializeComponent();

            // Read words from _input once user pauses for 1/2 second

            var words = (from _ in Input() select _input.Text).Throttle(TimeSpan.FromSeconds(0.5));

The core of the sample fires of three Bing searches let dutch = text.Translate("en", "nl"), …, for the word that the user has typed, and then uses a join pattern to wait for the first two of the three results to come back. You can think of join patterns as Outlook rules where instead of specifying what happens when a new email comes in, you specify rules for what happens when a new notification arrives on an observable collection. If the user types a new word before the results of the service call have arrived, we ignore the results using the TakeUntil(words) function.

            // Fire off three concurrent calls to the Bing service

            // Wait for two out of three to return,

            // or until the user types the next word

            var translations =

                from text in words

                let dutch = text.Translate("en", "nl")

                let french = text.Translate("en", "fr")

                let spanish = text.Translate("en", "es")

                from results in Observable.Join

                   (dutch.And(spanish).Then((d, s) =>

                        new { Dutch = d, French = Bing.NoResult, Spanish = s })

                   , dutch.And(french).Then((d, f) =>

                        new { Dutch = d, French = f, Spanish = Bing.NoResult })

                   , french.And(spanish).Then((f, s) =>

                        new { Dutch = Bing.NoResult, French = f, Spanish = s })

                   ).TakeUntil(words)

                select results;


Finally, we can subscribe to the observable collection of translations and update the UI with the translations provided by Bing:

            // Update the UI with the results

            translations.Subscribe(w =>

            {

                _dutch.Text   = w.Dutch.GetTranslation();

                _french.Text  = w.French.GetTranslation();

                _spanish.Text = w.Spanish.GetTranslation();

            });

        }

    }

}

While this is a much simplified example, it capture the essential design pattern of many AJAX applications where we need to react to UI events, orchestrate a number of concurrent webservice calls as the result of these UI events and update the UI with the results.