Rx and Scheduling – A Misconception

2 Nov

A common misconception I see during work with the reactive framework is how scheduling is achieved.  Let us look at the following example.

var stream = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .ObserveOn(Scheduler.ThreadPool)
    .Subscribe(i => Console.WriteLine("New Value" + i));

What this simple example is doing, is every 1 second, a value of type long is yielded, starting at zero and incrementing forever. Our ObserveOn call marshals the observation to a thread in the threadpool and we output the result to the Console.

A concern was raised a while back in that as we’re observing on a thread pool thread, we’re holding on to this threadpool thread for the duration of the stream. As we know, thread pool threads should not be held on for any long duration of time so this could potentially be a bad thing.

However, the truth of Rx is much simpler. The ObserveOn call simply tells Rx to use the specified scheduler when a new value is OnNexted. Therefore, every second, Rx pulls a thread from the thread pool and marshals the call to that thread. On that basis, we can see we’re using the thread for only the period of time that the code in the OnNext function is being run.

Don’t believe me? Pop this code into Linqpad or a console app, and you can see the results for yourself:

var stream = Observable.Interval(TimeSpan.FromSeconds(1));
stream
	.ObserveOn(Scheduler.NewThread)
	.Subscribe(i => String.Format("New Value on Thread {0}", Thread.CurrentThread.ManagedThreadId).Dump());
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s

%d bloggers like this: