Tag Archives: Reactive Extensions

A big Rx Release

13 Feb

Looks like the Rx team have been super busy.  After our Christmas release, we have another massive build to try out. 

Some key features:

  1. Publish, Prune and Replay have been reverted to their previous signatures.
  2. Expand operator that recursively expands an initial observable sequence using a selector function.
  3. A set of FastSubjects!

Full Release notes:

Various fixes:

  • Added generic variance annotations on ISubject<in T1, out T2> and IObserver<in TValue, out TResult>.
  • Enabling generic variance annotations for Silverlight 4 binaries.
  • No longer installing Silverlight assemblies to the GAC; fixes build issues with Expression Blend.
  • Made Unit, TimeStamped<T>, TimeInterval<T>, Notification<T> serializable.
  • Fixes to System.Linq.Async’s Single and SingleOrDefault operators.

· Scheduler improvements:

  • ControlScheduler and NewThreadScheduler now correctly use relative time.
  • CurrentThreadScheduler, EventLoopScheduler, NewThreadScheduler and VirtualScheduler now normalize time properly.
  • ImmediateScheduler, CurrentThreadScheduler and NewThreadScheduler no longer cause a thread sleep if the specified TimeSpan is zero.
  • DispatcherScheduler, ThreadPoolScheduler and SynchronizationContextScheduler avoid using a timer if a scheduled action’s specified TimeSpan is zero.
  • CurrentThreadScheduler on Windows Phone 7 no longer depends on ThreadStaticAttribute which is not implemented on Windows Phone 7.

· Changes to observable operators:

  • Multicast, Publish, Prune and Replay binding operators:
    • Changed return type of Multicast to IConnectableObservable<T>.
    • Changed Multicast overload with an observable sequence selector function to take in a subject selector function, allowing deferred construction of the subject being used.
    • Bringing back Publish, Prune and Replay operators, each of them calling into Multicast to fix the subject parameterization ([Behavior]Subject, AsyncSubject and ReplaySubject respectively).
    • Removed Publish overload that simply returns an IObservable<T>.
  • Buffering and windowing operators:
    • BufferWithTime, BufferWithCount and BufferWithTimeOrCount now return IObservable<IList<T>> as they used to do before.
    • Added WindowWithTime, WindowWithCount and WindowWithTimeOrCount which return IObservable<IObservable<T>>, as special cases of the general purpose Window operator.
    • Fixed issue with duplicate or missing values due to timer errors.
  • Added ToList, ToArray, ToDictionary and ToLookup aggregation standard query operators, returning an observable sequence with the (single) aggregated value.
  • Added SequenceEqual operator which checks that the values in a stream are equal (but not the timing).
  • Added Expand operator that recursively expands an initial observable sequence using a selector function.
  • Added Concat overload that takes an observable sequence of observable sequences and produces a single observable sequence which exhausts each inner observable sequence before subscribing to the next observable sequence.
  • Added Merge overload that allows specifying the maximum number of active sequences being merged at any point in time, queuing the rest in a way similar to Concat.
  • Added ManySelect operator which is the comonadic bind operator and is similar to ContinueWith on tasks but can be used for streams.
  • Added IgnoreValues operator which behaves as Where(_ => false).
  • Added GroupByUntil operator which is similar to GroupBy except that there’s a duration selector function to control the lifetime of each generated group.
  • Removed Drain. The same behavior can be restored using the (updated) more general purpose Concat and Merge(n) operators.
  • Fixed memory leaks in GroupJoin and Join where these operators held onto window disposables until the end of the (potentially infinite) outer source.

· Subjects changes:

  • Added FastSubject, FastBehaviorSubject, FastAsyncSubject and FastReplaySubject which are much faster than regular subjects but:
    • don’t decouple producer and consumer by an IScheduler (effectively limiting them to ImmediateScheduler);
    • don’t protect against stack overflow;
    • don’t synchronize input messages.
  • Fast subjects are used by Publish and Prune operators if no scheduler is specified.

· Qbservable changes:

o Operators that take multiple observable sequences now check whether the supplied sequences implement IQbservable<T>. If so, their expression tree gets stitched in; otherwise, a ConstantExpression is used to wrap the sequence.

Download the different release flavors here:

New Version of Rx – Join Support

31 Dec

Looks like the Rx team released a lovely new version of the framework.  Highlights are Join and GroupJoin support and better naming (in my opinion) for the Publish, Prune and Replay operators.

Full release notes
Downloads:

ObservableCollection to IObservable

9 Nov

Ever wanted your ObservableCollection<T>’s CollectionChanged event wrapped up in an IObservable stream? Try this extension method:

public static IObservable<NotifyCollectionChangedEventArgs> ToStream(this ObservableCollection<T> source)
{
	var stream =  Observable.FromEvent(
			(EventHandler ev)
			   => new NotifyCollectionChangedEventHandler(ev),
			ev => source.CollectionChanged += ev,
			ev => source.CollectionChanged -= ev)
		.Select(e => e.EventArgs);
	return stream;
}

Rx and Scheduling – A Misconception

2 Nov

A common misconception I see during work with the reactive framework is how scheduling is achieved.  Let us look at the following example.

var stream = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .ObserveOn(Scheduler.ThreadPool)
    .Subscribe(i => Console.WriteLine("New Value" + i));

What this simple example is doing, is every 1 second, a value of type long is yielded, starting at zero and incrementing forever. Our ObserveOn call marshals the observation to a thread in the threadpool and we output the result to the Console.

A concern was raised a while back in that as we’re observing on a thread pool thread, we’re holding on to this threadpool thread for the duration of the stream. As we know, thread pool threads should not be held on for any long duration of time so this could potentially be a bad thing.

However, the truth of Rx is much simpler. The ObserveOn call simply tells Rx to use the specified scheduler when a new value is OnNexted. Therefore, every second, Rx pulls a thread from the thread pool and marshals the call to that thread. On that basis, we can see we’re using the thread for only the period of time that the code in the OnNext function is being run.

Don’t believe me? Pop this code into Linqpad or a console app, and you can see the results for yourself:

var stream = Observable.Interval(TimeSpan.FromSeconds(1));
stream
	.ObserveOn(Scheduler.NewThread)
	.Subscribe(i => String.Format("New Value on Thread {0}", Thread.CurrentThread.ManagedThreadId).Dump());

Rx and the Async CTP

1 Nov

With the news about C# 5.0 and the new asynchronous keywords (async and await), it comes as no surprise that the Reactive Framework team is in on this.

I was a bit dubious to the future of Rx after seeing Anders’ video, but on the day of the PDC, the Rx team released a new version of the library that allows us to use the new async compiler magic with Rx.

Since this new async CTP doesn’t rely on framework features, a new runtime or anything other than convention, all that the Rx team needed to do to have the framework work, allow the conversion of an IObservable<T> to a class that conforms to the Awaiter convention.  As the spec says:

The expression t of an await-expression await t is called the task of the await expression. The task t is required to be awaitable, which means that all of the following must hold:

  • (t).GetAwaiter() is a valid expression of type A.
  • Given an expression a of type A and an expression r of type System.Action, (a).BeginAwait(r) is a valid boolean-expression.
  • Given an expression a of type A, (a).EndAwait() is a valid expression.

A is called the awaiter type for the await expression. The GetAwaiter method is used to obtain an awaiter for the task.

The BeginAwait method is used to sign up a continuation on the awaited task. The continuation passed is the resumption delegate, which is further explained below.

The EndAwait method is used to obtain the outcome of the task once it is complete.

The method calls will be resolved syntactically, so all of GetAwaiter, BeginAwait and EndAwait can be either instance members or extension methods, or even bound dynamically, as long as the calls are valid in the context where the await expression appears. All of them are intended to be “non-blocking”; that is, not cause the calling thread to wait for a significant amount of time, e.g. for an operation to complete.


What this means to us Rx’ers, is that we can leverage the GetAwaiter() extension method provided in the Rx framework to convert between an IObservable<T> into something that works with the new async and await keywords.  This method can be found in System.Reactive.dll:

public static ObservableAwaiter<TSource> GetAwaiter<TSource>(this IObservable<TSource> source);

Rx–Scheduling

29 Oct

With Rx, comes the ability to use different schedulers to have different scheduling effects.  By default the following schedulers are provided via the static Scheduler class.

public static CurrentThreadScheduler CurrentThread { get; }
public static DispatcherScheduler Dispatcher { get; }
public static ImmediateScheduler Immediate { get; }
public static NewThreadScheduler NewThread { get; }
public static TaskPoolScheduler TaskPool { get; }
public static ThreadPoolScheduler ThreadPool { get; } 

As we design our classes for testability, yes you do really, if our class uses Rx and uses a scheduler, such as the NewThreadScheduler, this makes it very hard to test our code.

For this blog post, we’ll be ignoring the TestScheduler as this post is more on class design as to testing the code directly.

The simplest solution, and hence the short blog post, is for once, we have a Microsoft library that uses interfaces. 🙂  All schedulers implement IScheduler which means we can wrap up all the static schedulers into an interface and inject them into our class.  For e.g.

public interface ISchedulerService
{
public static IScheduler CurrentThread { get; }
public static IScheduler Dispatcher { get; }
public static IScheduler Immediate { get; }
public static IScheduler NewThread { get; }
public static IScheduler TaskPool { get; }
public static IScheduler ThreadPool { get; } }
}
public class SchedulerService : ISchedulerService
{
public static IScheduler CurrentThread { get { return Scheduler.CurrentThread; } }
public static IScheduler Dispatcher { get { return Scheduler.Dispatcher ; } }
public static IScheduler Immediate { get { return Scheduler.Immediate ; } }
public static IScheduler NewThread { get { return Scheduler.NewThread ; } }
public static IScheduler TaskPool { get { return Scheduler.TaskPool ; } }
public static IScheduler ThreadPool { get { return Scheduler.ThreadPool ; } }
}

Now that our scheduler service is interfaced out, this means we can now mock out the ISchedulerService in our tests.

Simple, but neat.

Reactive Extension Design Guidelines

28 Oct

The Rx team have released some official guidelines for our consumption.  It is a pretty interesting read and has some good tips.

My favourites:

Consider drawing a Marble-diagram
Draw a marble-diagram of the observable sequence you want to create. By drawing the diagram, you will get a clearer picture on what operator(s) to use.

Subscribe implementations should not throw

As multiple observable sequences are composed, subscribe to a specific observable sequence might not happen at the time the user calls Subscribe (e.g. Within the Concat operator, the second observable sequence argument to Concat will only be subscribed to once the first observable sequence has completed). Throwing an exception would bring down the program. Instead exceptions in subscribe should be tunneled to the OnError method.

Implement new operators by composing existing operators.
Many operations can be composed from existing operators. This will lead to smaller, easier to maintain code. The Rx team has put a lot of effort in dealing with all corner cases in the base operators. By reusing these operators you’ll get all that work for free in your operator.