Важно: материал в этом посте немного устарел, но у меня есть классный видеокурс по современным Reactive Extensions, который покрывает как базовые так и продвинутые сценарии.
Уверен что многие из вас заметили как на CodeBetter участились посты на тему Reactive Extensions. Мне кажется что имеет смысл попробовать рассказать в одном посте про самые главные особенности этого фреймворка, что я и собираюсь сделать. Также, я расскажу про новые фичи LINQ.
Введение
Reactive Extensions или Rx (по аналигии с PFX) или “реактивные расширения” – это библиотека, точнее набор библиотек, которые позволяют работать с событиями и асинхронными вызовами в композиционном стиле, в т.ч. через Linq. Цель этого фреймворка – помочь разработчикам писать код в котором фигурирует асинхронное взаимодействие.
Rx был создан в Microsoft DevLabs. На данный момент, фреймворк поддерживает .Net 3.5SP1, .Net 4RC, Silverlight 3 и JavaScript (!!!). В отличии от того же Parallel Extensions, Rx не войдет в состав .Net 4, хотя ключевые его интерфейсы IObserver<T>
и IObservable<T>
туда попадут.
Фреймворк Rx состоит из шести пяти сборок, одна из которых – PFX (System.Threading.dll). Если вам нравится подключать 5 сборок в проект – пожалуйста – но я рекоммендую просто делать ILMerge и использовать одну большую толстую сборку ReactiveExtensions.dll.
N.b.: у меня аллергия на многосборочность, поэтому я и вам советую скачать ILmerge и чуть-чуть упростить свою разработку.
Подключив сборку в проект, вы получите не только новые классы и интерфейсы, но также методы расширения.
Идеология Rx
Менталитет Rx сводится к так называемым push-коллекциям. Разница примерно такая: pull-коллекции предоставляют свое содержимое по запросу – вы вызываете MoveNext()
пока не закончатся элементы. Push-коллекции же сами “толкают” элементы в вашем направлении – вам только нужно подписаться на выдачу элементов. При этом, когда я говорю “элементы” я не имею ввиду только элементы коллекций. Например, если происходит событие, то push-коллекция в контексте Rx может выдавать аргументы события. Или результат асинхронного вызова. Вообщем, приложение становится “реактивным”, то есть учится реагировать на события и аггрегировать их для более удобного использования.
Rx предлагает два новый интерфейса – IObserver<T>
и IObservable<T>
. Эти интерфейсы используются для реализции объектов со встроенной поддержкой “реактивности”, хотя на самом деле Rx можно использовать и без их прямого использования.
Итак, возьмем минималистичный пример – подписку на событие. Например, есть у нас события передвижения мыши и мы хотим записать первые десять сгенерированных событий в ListBox
. Делается это так:
var o = Observable.FromEvent<MouseEventArgs>( this, "MouseMove").Take(10); o.Subscribe(x => listBox.Items.Add( string.Format("{0},{1}", x.EventArgs.X, x.EventArgs.Y)));
Начнем наше обсуждение с класса Observable
. Этот класс позволяет конвертировать различные наборы состояний (например коллекции IEnumerable
или результаты событий) в push-коллекцию типа IObservable<T>
. Например, в нашем случае мы конвертируем первые десять вызовов события MouseMove
(литерал тут не очень к месту, но это сейчас не важно) в тип IObservable<IEvent<MouseEventArgs>>
.
Теперь второе выражение, которое не столь интуитивно. Суть в том, что для того чтобы подписаться на push-нотификации от IObservable
, нужно вызывать метод Subscribe()
на самом IObservable
. Это не очень интуитивно, но что поделать. В нашем примере мы делаем подписку на первые 10 событий MouseMove
которые будут вызваны у формы. Для каждого из событий, а точнее для каждого успешного события (об этом далее), мы вызываем код который добавляет координаты в список listBox
.
Подробно о IObserver и IObservable
В контексте Rx, интерфейсы IObserver
и IObservable
играют важную роль, даже если мы не реализуем их в наших классах. Как вы уже догадались, IObservable<T>
реализует push-коллекция которая умеет выдывать объекты типа T
. IObserver
– это интерфейс, который реализует тот, кто подписан на уведомления от IObservable<T>
.
Начнем с интерфейса IObserver
. В нем всего три метода:
void OnNext(T value)
Это главный метод, который вызывается в момент “поставки” нового значения. Тот делегат что мы предоставили в примере – это делегат для обработкиOnNext()
.void OnError(Exception exception)
Этот метод вызывается когда что-то идет не так и нужно сказать наблюдателю о том, что произошла ошибка. В параметре метода хранится нужная информация о возникшем исключении.void OnCompleted()
Вызывается когда у коллекции кончились элементы для выдачи. В нашем примере выше, этот метод вызвался после 20го элемента, но поскольку мы его не обрабатывали, ничего не произошло.
Порядок вызовов обычно фиксирован – несколько OnNext()
, а потом либо OnError()
либо OnCompleted()
.
Теперь давайте обсудим IObservable<T>
. Тут все в каком смысле проще – точнее так может показаться на первый взгляд. У этого интерфейса всего один метод:
IDisposable Subscribe(IObserver<T> observer)
Позволяет наблюдателю подписаться на события. При этом, выдается ‘disposable token’ (это такое название паттерна, я только что придумал), то есть выдаетсяIDisposable
подписчика на котором можно в последствии вызватьDispose()
и тем самым “закрыть подписку”.
Мы уже встретили метод Subscribe()
в нашем примере. Помимо поддержки этого метода для Observable
и IObservable<T>
, этот метод также реализован для IEnumerable<T>
. В принципе, реализовать свой метод Subscribe()
просто – нужно просто держать список всех подписчиков и в методе Subscribe()
добавить подписчика в список и вернуть его как IDisposable
:
public IDisposable Subscribe(IObserver<Location> observer) { observers.Add(observer); // Announce current location to new observer. observer.OnNext(this.Location); return observer as IDisposable; }
Полный пример реализации можно посмотреть на сайте MSDN.
Observer и Observable
В отличии от интерфейсов IObserver<T>
и IObservable<T>
, которые находятся в сборке System.Observable
, классы Observable
и Observer
находятся в сборке System.Reactive
, являются статическими, и главной их функцией являются LINQ-образные расширения для Rx (собственно то, что некоторые называют Linq to Events). Думаю что тут не обойтись без более-менее детализированного описания того что внутри этих классов, благо в них есть весьма забавные надстройки над Linq.
Начнем с простого. Класс Observer
позволяет, в первую очередь, создавать объекты типа IObserver<T>
с помощью комбинации обработчиков System.Action
для различных комбинаций OnNext()
, OnError()
и OnException()
. Зачем это надо? Все очень просто – мало кому хочется реализовывать интерфейс IObserver<T>
на собственном классе, а тут все просто и анонимно.
Теперь обсудим класс посерьезней – Observable
. Это полезный класс! Во-первых, он предоставляет статические методы для конверсии различных источников (например, события) в коллекции. Мы уже видели это на примере Observable.FromEvent()
. Помимо этого есть, например, Observable.FromAsyncPattern()
– метод который поможет создать последовательности из типичных BeginXxx/EndXxx
вызовов которые фигурируют в APM-модели. Помимо этого, путем методов расширения, класс Observer
помогает, например, делать IObservable
из IEnumerable
. Это позволит нам получать данные из обычной коллекции.
var l = new List<int> {1, 2, 3}; l.ToObservable().Subscribe(x => listBox.Items.Add(x));
Observable
, точно так же как и Observer
, позволяет создавать объект типа IObservable<T>
путем простой передачи делегата в метод Create()
.
У класса также есть интересный метод Interval()
, который позволит получать значения с регулярным интервалом времени. Например, вот кусочек кода (вся реализация тут) который скачивает веб-страницу каждые 5 секунд:
var q = from t in Observable.Interval(TimeSpan.FromSeconds(5)) let request = WebRequest.Create("http://activemesa.com") from response in request.GetResponseRx() select ReadStream(response);
Класс Observable
реазизует много методов которые как реализуют функционал LINQ, так и дополняют его. Примечательно, что все дополнения Rx которые были сделаны для IObservable
были также продублированы для IEnumerable
.
Linq to events
Итак, для поддержки Rx были добавлены новые методы расширения для последовательностей. Вот несколько их них:
Amb(first, second)
возвращает ту последовательность, которая быстрее “откликнулась”.BufferWithCount()
иBufferWithTime()
позволяют закэшировать набор элементов последовательности либо по количеству, либо в определенный временной отрезок.Catch()
в случае исключения в одной из последовательностей позволит получать данные из другой.SkipUntil()
иTakeUntil()
позволяют выдавать или пропускать результаты первой коллекции только пока вторая коллекция ничего не производит.
Одним из “шаблонных” применений Rx является реализация drag-and-drop, то есть перетаскивания, скажем, картинки мышкой по экрану. Помните насколько болезненно это делать “классичесими методами”? Ведь нужно хранить флаг нажатой кнопки, писать три обработчика событий. Неудобно.
Вот как это делается с Rx. Прежде всего, каждое событие обворачивается в IObservable
:
var mouseMove = Observable.FromEvent<MouseEventArgs>(image, "MouseMove"); var mouseLeftButton = Observable.FromEvent<MouseButtonEventArgs>(image, "MouseLeftButtonDown"); var mouseLeftButtonUp = Observable.FromEvent<MouseButtonEventArgs>(image, "MouseLeftButtonUp");
Потом, используя методы SkipUntil()
и TakeUntil()
мы генерируем последовательность, которая отражает концепцию “пока пользователь нажал левую кнопку мыши и до того момента пока он ее не отпустил”.
var draggingEvents = mouseMove.SkipUntil(mouseLeftButtonDown). TakeUntil(mouseLeftButtonUp).Repeat();
Далее можно сгенерировать набор точек (x, y)
как разницу между текущими и предыдущими координатами, и потом собственно двигать картинку (полный пример для SL тут).
var xyEvents = from pos in draggingEvents .Let( mm => mm.Zip( mm.Skip( 1 ), ( prev, cur ) => new { X = cur.EventArgs.GetPosition( this ).X - prev.EventArgs.GetPosition( this ).X, Y = cur.EventArgs.GetPosition( this ).Y - prev.EventArgs.GetPosition( this ).Y } ) ).Repeat() select pos; xyEvents.Subscribe( p => { Canvas.SetLeft(image, Canvas.GetLeft(image) + p.X); Canvas.SetTop(image, Canvas.GetTop(image) + p.Y); });
Заключение
Нужна еще не одна статья чтобы показать для чего полезен Rx и какая дополнительно в нем существует инфраструктура. Надеюсь то короткое описание что я привел дает хотя бы общее представление о том, что такое Rx. Главное, что фреймворк сам по себе достаточно прост и вы можете изучить его и внедрить в проект в течении одного дня. Удачи! ■
Обновление от 26/03/2010
Код, который я привел для drag-and-drop выше работает для Silverlight, но почему-то отказывается работать под WPF. Причины мне не известны, но вот код который нормально работает под WPF:
var mouseDown = Observable.FromEvent<MouseButtonEventArgs>(image, "MouseDown") .Select(evt => evt.EventArgs.GetPosition(image)); var mouseUp = Observable.FromEvent<MouseButtonEventArgs>(image, "MouseUp"); var mouseMove = Observable.FromEvent<MouseEventArgs>(image, "MouseMove") .Select(evt => evt.EventArgs.GetPosition(this)); var q = mouseDown.SelectMany(imageOffset => mouseMove.TakeUntil(mouseUp), (imageOffset, pos) => new Point { X = pos.X - imageOffset.X, Y = pos.Y - imageOffset.Y }); q.Subscribe(Observer.Create<Point>(p => { Canvas.SetLeft(image, p.X); Canvas.SetTop(image, p.Y); }));
Оставить комментарий