Обзор Реактивных Расширений (Reactive Extensions, Rx)

Важно: материал в этом посте немного устарел, но у меня есть классный видеокурс по современным Reactive Extensions, который покрывает как базовые так и продвинутые сценарии.

RxУверен что многие из вас заметили как на CodeBetter участились посты на тему Reactive Extensions. Мне кажется что имеет смысл попробовать рассказать в одном посте про самые главные особенности этого фреймворка, что я и собираюсь сделать. Также, я расскажу про новые фичи LINQ.

Введение

Reactive Extensions или Rx (по аналигии с PFX) или “реактивные расширения” – это библиотека, точнее набор библиотек, которые позволяют работать с событиями и асинхронными вызовами в композиционном стиле, в т.ч. через Linq. Цель этого фреймворка – помочь разработчикам писать код в котором фигурирует асинхронное взаимодействие.

Rx был создан в Microsoft DevLabs. На данный момент, фреймворк поддерживает .Net 3.5SP1, .Net 4RC, Silverlight 3 и JavaScript (!!!). В отличии от того же Parallel Extensions, Rx не войдет в состав .Net 4, хотя ключевые его интерфейсы IObserver<T> и IObservable<T> туда попадут.

Фреймворк Rx состоит из шести пяти сборок, одна из которых – PFX (System.Threading.dll). Если вам нравится подключать 5 сборок в проект – пожалуйста – но я рекоммендую просто делать ILMerge и использовать одну большую толстую сборку ReactiveExtensions.dll.

N.b.: у меня аллергия на многосборочность, поэтому я и вам советую скачать ILmerge и чуть-чуть упростить свою разработку.

Подключив сборку в проект, вы получите не только новые классы и интерфейсы, но также методы расширения.

Идеология Rx

Менталитет Rx сводится к так называемым push-коллекциям. Разница примерно такая: pull-коллекции предоставляют свое содержимое по запросу – вы вызываете MoveNext() пока не закончатся элементы. Push-коллекции же сами “толкают” элементы в вашем направлении – вам только нужно подписаться на выдачу элементов. При этом, когда я говорю “элементы” я не имею ввиду только элементы коллекций. Например, если происходит событие, то push-коллекция в контексте Rx может выдавать аргументы события. Или результат асинхронного вызова. Вообщем, приложение становится “реактивным”, то есть учится реагировать на события и аггрегировать их для более удобного использования.

Rx предлагает два новый интерфейса – IObserver<T> и IObservable<T>. Эти интерфейсы используются для реализции объектов со встроенной поддержкой “реактивности”, хотя на самом деле Rx можно использовать и без их прямого использования.

Итак, возьмем минималистичный пример – подписку на событие. Например, есть у нас события передвижения мыши и мы хотим записать первые десять сгенерированных событий в ListBox. Делается это так:

var o = Observable.FromEvent<MouseEventArgs>(
  this, "MouseMove").Take(10);
o.Subscribe(x => listBox.Items.Add(
  string.Format("{0},{1}", x.EventArgs.X, x.EventArgs.Y)));

Начнем наше обсуждение с класса Observable. Этот класс позволяет конвертировать различные наборы состояний (например коллекции IEnumerable или результаты событий) в push-коллекцию типа IObservable<T>. Например, в нашем случае мы конвертируем первые десять вызовов события MouseMove (литерал тут не очень к месту, но это сейчас не важно) в тип IObservable<IEvent<MouseEventArgs>>.

Теперь второе выражение, которое не столь интуитивно. Суть в том, что для того чтобы подписаться на push-нотификации от IObservable, нужно вызывать метод Subscribe() на самом IObservable. Это не очень интуитивно, но что поделать. В нашем примере мы делаем подписку на первые 10 событий MouseMove которые будут вызваны у формы. Для каждого из событий, а точнее для каждого успешного события (об этом далее), мы вызываем код который добавляет координаты в список listBox.

Подробно о IObserver и IObservable

В контексте Rx, интерфейсы IObserver и IObservable играют важную роль, даже если мы не реализуем их в наших классах. Как вы уже догадались, IObservable<T> реализует push-коллекция которая умеет выдывать объекты типа T. IObserver – это интерфейс, который реализует тот, кто подписан на уведомления от IObservable<T>.

Начнем с интерфейса IObserver. В нем всего три метода:

  • void OnNext(T value)
    Это главный метод, который вызывается в момент “поставки” нового значения. Тот делегат что мы предоставили в примере – это делегат для обработки OnNext().
  • void OnError(Exception exception)
    Этот метод вызывается когда что-то идет не так и нужно сказать наблюдателю о том, что произошла ошибка. В параметре метода хранится нужная информация о возникшем исключении.
  • void OnCompleted()
    Вызывается когда у коллекции кончились элементы для выдачи. В нашем примере выше, этот метод вызвался после 20го элемента, но поскольку мы его не обрабатывали, ничего не произошло.

Порядок вызовов обычно фиксирован – несколько OnNext(), а потом либо OnError() либо OnCompleted().

Теперь давайте обсудим IObservable<T>. Тут все в каком смысле проще – точнее так может показаться на первый взгляд. У этого интерфейса всего один метод:

  • IDisposable Subscribe(IObserver<T> observer)
    Позволяет наблюдателю подписаться на события. При этом, выдается ‘disposable token’ (это такое название паттерна, я только что придумал), то есть выдается IDisposable подписчика на котором можно в последствии вызвать Dispose() и тем самым “закрыть подписку”.

Мы уже встретили метод Subscribe() в нашем примере. Помимо поддержки этого метода для Observable и IObservable<T>, этот метод также реализован для IEnumerable<T>. В принципе, реализовать свой метод Subscribe() просто – нужно просто держать список всех подписчиков и в методе Subscribe() добавить подписчика в список и вернуть его как IDisposable:

public IDisposable Subscribe(IObserver<Location> observer) 
{
   observers.Add(observer);
   // Announce current location to new observer.
   observer.OnNext(this.Location);
   return observer as IDisposable;
}

Полный пример реализации можно посмотреть на сайте MSDN.

Observer и Observable

В отличии от интерфейсов IObserver<T> и IObservable<T>, которые находятся в сборке System.Observable, классы Observable и Observer находятся в сборке System.Reactive, являются статическими, и главной их функцией являются LINQ-образные расширения для Rx (собственно то, что некоторые называют Linq to Events). Думаю что тут не обойтись без более-менее детализированного описания того что внутри этих классов, благо в них есть весьма забавные надстройки над Linq.

Начнем с простого. Класс Observer позволяет, в первую очередь, создавать объекты типа IObserver<T> с помощью комбинации обработчиков System.Action для различных комбинаций OnNext(), OnError() и OnException(). Зачем это надо? Все очень просто – мало кому хочется реализовывать интерфейс IObserver<T> на собственном классе, а тут все просто и анонимно.

Теперь обсудим класс посерьезней – Observable. Это полезный класс! Во-первых, он предоставляет статические методы для конверсии различных источников (например, события) в коллекции. Мы уже видели это на примере Observable.FromEvent(). Помимо этого есть, например, Observable.FromAsyncPattern() – метод который поможет создать последовательности из типичных BeginXxx/EndXxx вызовов которые фигурируют в APM-модели. Помимо этого, путем методов расширения, класс Observer помогает, например, делать IObservable из IEnumerable. Это позволит нам получать данные из обычной коллекции.

var l = new List<int> {1, 2, 3};
l.ToObservable().Subscribe(x => listBox.Items.Add(x));

Observable, точно так же как и Observer, позволяет создавать объект типа IObservable<T> путем простой передачи делегата в метод Create().

У класса также есть интересный метод Interval(), который позволит получать значения с регулярным интервалом времени. Например, вот кусочек кода (вся реализация тут) который скачивает веб-страницу каждые 5 секунд:

var q = from t in Observable.Interval(TimeSpan.FromSeconds(5))
        let request = WebRequest.Create("http://activemesa.com")
        from response in request.GetResponseRx()
        select ReadStream(response);

Класс Observable реазизует много методов которые как реализуют функционал LINQ, так и дополняют его. Примечательно, что все дополнения Rx которые были сделаны для IObservable были также продублированы для IEnumerable.

Linq to events

Итак, для поддержки Rx были добавлены новые методы расширения для последовательностей. Вот несколько их них:

  • Amb(first, second) возвращает ту последовательность, которая быстрее “откликнулась”.
  • BufferWithCount() и BufferWithTime() позволяют закэшировать набор элементов последовательности либо по количеству, либо в определенный временной отрезок.
  • Catch() в случае исключения в одной из последовательностей позволит получать данные из другой.
  • SkipUntil() и TakeUntil() позволяют выдавать или пропускать результаты первой коллекции только пока вторая коллекция ничего не производит.

Одним из “шаблонных” применений Rx является реализация drag-and-drop, то есть перетаскивания, скажем, картинки мышкой по экрану. Помните насколько болезненно это делать “классичесими методами”? Ведь нужно хранить флаг нажатой кнопки, писать три обработчика событий. Неудобно.

Вот как это делается с Rx. Прежде всего, каждое событие обворачивается в IObservable:

var mouseMove = Observable.FromEvent<MouseEventArgs>(image, "MouseMove");
var mouseLeftButton = Observable.FromEvent<MouseButtonEventArgs>(image, "MouseLeftButtonDown");
var mouseLeftButtonUp = Observable.FromEvent<MouseButtonEventArgs>(image, "MouseLeftButtonUp");

Потом, используя методы SkipUntil() и TakeUntil() мы генерируем последовательность, которая отражает концепцию “пока пользователь нажал левую кнопку мыши и до того момента пока он ее не отпустил”.

var draggingEvents = mouseMove.SkipUntil(mouseLeftButtonDown).
    TakeUntil(mouseLeftButtonUp).Repeat();

Далее можно сгенерировать набор точек (x, y) как разницу между текущими и предыдущими координатами, и потом собственно двигать картинку (полный пример для SL тут).

var xyEvents = from pos in draggingEvents
                     .Let( mm => mm.Zip( mm.Skip( 1 ), ( prev, cur ) =>
                         new
                         {
                             X = cur.EventArgs.GetPosition( this ).X - 
                                 prev.EventArgs.GetPosition( this ).X,
                             Y = cur.EventArgs.GetPosition( this ).Y - 
                                 prev.EventArgs.GetPosition( this ).Y
                         } ) ).Repeat()
                     select pos;
xyEvents.Subscribe(
  p =>
  {
      Canvas.SetLeft(image, Canvas.GetLeft(image) + p.X);
      Canvas.SetTop(image, Canvas.GetTop(image) + p.Y);
  });

Заключение

Нужна еще не одна статья чтобы показать для чего полезен Rx и какая дополнительно в нем существует инфраструктура. Надеюсь то короткое описание что я привел дает хотя бы общее представление о том, что такое Rx. Главное, что фреймворк сам по себе достаточно прост и вы можете изучить его и внедрить в проект в течении одного дня. Удачи! ■

Обновление от 26/03/2010

Код, который я привел для drag-and-drop выше работает для Silverlight, но почему-то отказывается работать под WPF. Причины мне не известны, но вот код который нормально работает под WPF:

var mouseDown =
  Observable.FromEvent<MouseButtonEventArgs>(image, "MouseDown")
  .Select(evt => evt.EventArgs.GetPosition(image));
var mouseUp = Observable.FromEvent<MouseButtonEventArgs>(image, "MouseUp");
var mouseMove =
  Observable.FromEvent<MouseEventArgs>(image, "MouseMove")
  .Select(evt => evt.EventArgs.GetPosition(this));
var q = mouseDown.SelectMany(imageOffset => mouseMove.TakeUntil(mouseUp),
        (imageOffset, pos) => new Point
        {
          X = pos.X - imageOffset.X, 
          Y = pos.Y - imageOffset.Y
        });
q.Subscribe(Observer.Create<Point>(p =>
{
  Canvas.SetLeft(image, p.X);
  Canvas.SetTop(image, p.Y);
}));

16 responses to “Обзор Реактивных Расширений (Reactive Extensions, Rx)”

  1. >> На данный момент, фреймворк поддерживает .Net 3.5SP1, .Net 4RC и Silverlight 3.

    буквально на днях добавили поддержку Rx в JavaScript. на MIX по этому поводу был доклад, можно скачать видеою.

    Статья отличная, спасибо.

    1. Ага, CodeBetter как раз сейчас эту тему обсуждает. Добавлю в пост – спасибо что напомнили.

      1. кстати, с Rx for JS идет простой, но классный визуальный пример.

        рекомендую читателям скачать и посмотреть. радует и малое количество кода, которое потребовалось чтобы эффект реализовать.

  2. Да, интересно. Спасибо!

    Слежу за RX уже несколько месяцев, ибо сразу понравилось. И даже уже хочется поюзать где-нибудь, но пока как-то мозг не повернулся, чтобы понять как это юзать. :) Вроде ведь все понятно, и даже элементарно по концепции, но, в то же время, настолько необычно, что не схватываешь.

    Так что надо продолжать смотреть примеры – ожидать пока оно само в голове уложится как надо.

  3. Самого главного и не сказал, что IObserver – это вывернутый наизнанку IEnumerable

  4. Не по теме: а почему многосборочность вызывает аллергию?

    1. Если коротко – потому что легче добавить одну сборку чем несколько. Посмотрите на Rx – если не считать сборку PFX, то получится 5 сборок, которые вам все нужны (а даже если не все, вы немного потеряете, добавив все из них). То же самое практически в любой библиотеке. Поиск всех пяти в GACе – потеря времени.

      Вторая сторона монеты что заказчику очень удобно давать один EXE, а не груду всяких DLLек. Тут тоже ILmerge спасает, правда иногда приходится использовать ресурсные вставки для WPF, всяких unmanaged сборок, баз данных, и т.д.

      1. > Вторая сторона монеты что заказчику очень удобно давать один EXE, а не груду всяких DLLек.

        Да, это я выпустил из виду: чем меньше всяких “лишних” для пользователя файлов, тем лучше. Вообще давняя моя мечта – такой мегалинкер, который бы формировал из используемых .нет длл’ек эдакий минисетап фреймворка. Типа вот если я использую System и допустим System.Text, чтобы даже если у пользователя нет установленного фреймворка, то он все равно смог запустить мою программу. А то иногда бывает как-то неудобно требовать установки FW весом в 20 метров (это если про 2.0, 3.5 так сразу триста метров) ради утилиты в пару сотен кб…

    2. Ну, мне кажется сейчас пользователи с DSL готовы установить что угодно, особенно если все само установится и нужно только сидеть и ждать.

  5. var mouseDown = from evt in Observable.FromEvent(this, “MouseDown”)
    select evt.EventArgs.GetPosition(this);

    var q = from start in mouseDown
    from pos in MouseMove.StartWith(start). TakeUntil(mouseUp)
    select pos;

    почему нельзя написать:
    var q = from pos in MouseMove.StartWith(mouseDown).TakeUntil(mouseUp)
    select pos;

  6. Спасибо, Дмитрий!
    Каждый раз с большим удовольствием читаю Ваши статьи.
    Маленькие очепятки:
    ..которая быстрее “откликнулАсь”
    ..каждое событие об(в)орачивается в IObseRvable

    1. Спасибо за добрые слова! Опечатки поправил.

  7. Спасибо за отличный обзор RX

    1. Рад что понравилось!

  8. Уважаемый Дмитрий, а можно ли, наоборот, превратить push в pull? Предположим, у меня есть некий сетевой прослушиватель, который выдаёт полученные из сети данные. Выдаёт он их генерируя соответствующее событие и в методе-обработчике я могу работать с этими данными. Но такой подход не очень удобен. Дело в том, что не я запрашиваю данные, а они ко мне приходят, т.е. я не могу написать что-то вроде var data=await Receive(). И для того, чтобы была возможна привычная модель получения данных, я реализовал следующий подход: прослушиватель при получении данных помещает их в потокобезопасную очередь, а вышеупомянутый метод Receive их оттуда извлекает. При отсутствии данных происходит асинхронная блокировка. Данный подход работает, но возможно есть какие-то более правильные способы решения данной проблемы? Возможно, тут подошёл бы какой-то паттерн навроде Реактора или чего-то подобного?

  9. […] этого достаточно. Подробнее можно почитать, например, здесь или […]

Оставить комментарий