Rx and Scheduling – A Misconception

2 Nov

A common misconception I see during work with the reactive framework is how scheduling is achieved.  Let us look at the following example.

var stream = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .ObserveOn(Scheduler.ThreadPool)
    .Subscribe(i => Console.WriteLine("New Value" + i));

What this simple example is doing, is every 1 second, a value of type long is yielded, starting at zero and incrementing forever. Our ObserveOn call marshals the observation to a thread in the threadpool and we output the result to the Console.

A concern was raised a while back in that as we’re observing on a thread pool thread, we’re holding on to this threadpool thread for the duration of the stream. As we know, thread pool threads should not be held on for any long duration of time so this could potentially be a bad thing.

However, the truth of Rx is much simpler. The ObserveOn call simply tells Rx to use the specified scheduler when a new value is OnNexted. Therefore, every second, Rx pulls a thread from the thread pool and marshals the call to that thread. On that basis, we can see we’re using the thread for only the period of time that the code in the OnNext function is being run.

Don’t believe me? Pop this code into Linqpad or a console app, and you can see the results for yourself:

var stream = Observable.Interval(TimeSpan.FromSeconds(1));
stream
	.ObserveOn(Scheduler.NewThread)
	.Subscribe(i => String.Format("New Value on Thread {0}", Thread.CurrentThread.ManagedThreadId).Dump());
Advertisements

Rx and the Async CTP

1 Nov

With the news about C# 5.0 and the new asynchronous keywords (async and await), it comes as no surprise that the Reactive Framework team is in on this.

I was a bit dubious to the future of Rx after seeing Anders’ video, but on the day of the PDC, the Rx team released a new version of the library that allows us to use the new async compiler magic with Rx.

Since this new async CTP doesn’t rely on framework features, a new runtime or anything other than convention, all that the Rx team needed to do to have the framework work, allow the conversion of an IObservable<T> to a class that conforms to the Awaiter convention.  As the spec says:

The expression t of an await-expression await t is called the task of the await expression. The task t is required to be awaitable, which means that all of the following must hold:

  • (t).GetAwaiter() is a valid expression of type A.
  • Given an expression a of type A and an expression r of type System.Action, (a).BeginAwait(r) is a valid boolean-expression.
  • Given an expression a of type A, (a).EndAwait() is a valid expression.

A is called the awaiter type for the await expression. The GetAwaiter method is used to obtain an awaiter for the task.

The BeginAwait method is used to sign up a continuation on the awaited task. The continuation passed is the resumption delegate, which is further explained below.

The EndAwait method is used to obtain the outcome of the task once it is complete.

The method calls will be resolved syntactically, so all of GetAwaiter, BeginAwait and EndAwait can be either instance members or extension methods, or even bound dynamically, as long as the calls are valid in the context where the await expression appears. All of them are intended to be “non-blocking”; that is, not cause the calling thread to wait for a significant amount of time, e.g. for an operation to complete.


What this means to us Rx’ers, is that we can leverage the GetAwaiter() extension method provided in the Rx framework to convert between an IObservable<T> into something that works with the new async and await keywords.  This method can be found in System.Reactive.dll:

public static ObservableAwaiter<TSource> GetAwaiter<TSource>(this IObservable<TSource> source);

A C# Wish – Static Extension Methods

1 Nov

Something I’d love to have in C# – static extension methods.  While extension methods are extremely powerful – look what LINQ and Rx have done with them, I’d love this extra little bit of help.

As an example, the Rx framework have quite a few static helper classes.  For example, Observable.Interval(Timespan) will return a stream of data that yields a value on every interval of the timespan you specify.  The Observable static class is full of these methods.

To make any methods you write that fit under the scope of this Observable class discoverable, you would ideally write an extension method on the Observable class.  However, in the current C# spec, static extension methods only can be called on an instance of a class and not on the the type.

Added to wish list. 🙂

Rx–Scheduling

29 Oct

With Rx, comes the ability to use different schedulers to have different scheduling effects.  By default the following schedulers are provided via the static Scheduler class.

public static CurrentThreadScheduler CurrentThread { get; }
public static DispatcherScheduler Dispatcher { get; }
public static ImmediateScheduler Immediate { get; }
public static NewThreadScheduler NewThread { get; }
public static TaskPoolScheduler TaskPool { get; }
public static ThreadPoolScheduler ThreadPool { get; } 

As we design our classes for testability, yes you do really, if our class uses Rx and uses a scheduler, such as the NewThreadScheduler, this makes it very hard to test our code.

For this blog post, we’ll be ignoring the TestScheduler as this post is more on class design as to testing the code directly.

The simplest solution, and hence the short blog post, is for once, we have a Microsoft library that uses interfaces. 🙂  All schedulers implement IScheduler which means we can wrap up all the static schedulers into an interface and inject them into our class.  For e.g.

public interface ISchedulerService
{
public static IScheduler CurrentThread { get; }
public static IScheduler Dispatcher { get; }
public static IScheduler Immediate { get; }
public static IScheduler NewThread { get; }
public static IScheduler TaskPool { get; }
public static IScheduler ThreadPool { get; } }
}
public class SchedulerService : ISchedulerService
{
public static IScheduler CurrentThread { get { return Scheduler.CurrentThread; } }
public static IScheduler Dispatcher { get { return Scheduler.Dispatcher ; } }
public static IScheduler Immediate { get { return Scheduler.Immediate ; } }
public static IScheduler NewThread { get { return Scheduler.NewThread ; } }
public static IScheduler TaskPool { get { return Scheduler.TaskPool ; } }
public static IScheduler ThreadPool { get { return Scheduler.ThreadPool ; } }
}

Now that our scheduler service is interfaced out, this means we can now mock out the ISchedulerService in our tests.

Simple, but neat.

Reactive Extension Design Guidelines

28 Oct

The Rx team have released some official guidelines for our consumption.  It is a pretty interesting read and has some good tips.

My favourites:

Consider drawing a Marble-diagram
Draw a marble-diagram of the observable sequence you want to create. By drawing the diagram, you will get a clearer picture on what operator(s) to use.

Subscribe implementations should not throw

As multiple observable sequences are composed, subscribe to a specific observable sequence might not happen at the time the user calls Subscribe (e.g. Within the Concat operator, the second observable sequence argument to Concat will only be subscribed to once the first observable sequence has completed). Throwing an exception would bring down the program. Instead exceptions in subscribe should be tunneled to the OnError method.

Implement new operators by composing existing operators.
Many operations can be composed from existing operators. This will lead to smaller, easier to maintain code. The Rx team has put a lot of effort in dealing with all corner cases in the base operators. By reusing these operators you’ll get all that work for free in your operator.

Edge UG Videos

21 Oct

Since we created some great content in the 3 or so years, I thought I’d bring a blog post of all the cool videos Ian Smith did for us.  Some great reminiscing here. Smile

When Agile Goes Bad – Seb Lambla

MVC Best Practises – Seb Lambla

Hyper-V – Jeremy Pack

Visual Studio 2010 – Jason Zander

OWASP – Top Ten Vulnerabilities – Barry Dorrans:

Part 1

Part 2

Iron Ruby – Ben Hall

Mix10 Recap – Adam Kinney

Part 1

Part 2

F# – Phil Trelford

.NET Package Management Tools – A thought

18 Oct

I was chatting with Mark Rendle on Twitter this evening about the emergence of some pretty neat package management tools for .NET devs.

My main gist was that I don’t see why there is such a clamouring for these tools.  At the risk of sounding like a 97 1/2 year old developer, I’ve not seen much need for a hard-core package management app.  In the current project that I am involved in, we have aournd 24 assemblies, with probably a total of 20-25 dependencies on external DLLs (RhinoMocks, PRISM, Rx etc.)

At no point do we feel overwhelmed with the number, or the requirement for keeping these up to date.  Our primary concern is the app itself and the intricacies in getting Fx prices onto a screen where a trader can place his/her trades.

While I do see the ability to retrieve and update packages in the Ruby  gem way, I don’t see an uptake that the propaganda seems to be promising.  What we do need however, is the take-up where a software process includes this as a “standard” like source control instead of something that we could have.

I guess in my old age, I’d like to see the feedback loop kick off a little more before I zimmer my way down that road.