Брокеры событий, часть 2

RxВ моем предыдущем посте я описал структуру простого брокера событий с использованием только лишь языка C# и переведения классов от событий к передаче сообщений, тем самым оставив позади ключевое слово event и всю ту неразбериху, которая обычно его сопровождает. В этой части поста мы посмотрим на библиотеку Reactive Extensions и то как ее можно подключить к брокеру дабы получать выборки из сообщений а также отписку по IDisposable.

Subscription

Прежде всего, имеет смысл рассматривать подписку не как некий KeyValuePair, а как IDisposable, с тем пониманием что подписка существует только тогда, когда существует подписчик, и что со смертью подписчика было бы неплохо отписаться и от самого события. Поэтому берем и пишем самый простую возможную реализацию:

private class Subscription : IDisposable
{
  private readonly EventBroker broker;
  public IObserver<EventArgs> Subscriber { get; private set; }
  public Subscription(EventBroker broker, IObserver<EventArgs> subscriber)
  {
    this.broker = broker;
    this.Subscriber = subscriber;
  }
  public void Dispose()
  {
    broker.Unsubscribe(Subscriber);
  }
}

Класс это приватный, он живет внутри брокера. Является хранилищем ссылки на подписчика. Как только происходит Dispose() на эту подписку, она дает комманду брокеру отписать наблюдателя. Эта подписка хранится в брокере и выдается подписчику на тот случай если подписчик захочет в последствии “отдать концы”.

Брокер

Наш брокер теперь реализует IObsevable<T>, что позволяет подписчику выполнять разные злостные операции прямо на брокере до того как подписываться. Это звучит фантастично, но если думать о LINQ как о генераторе некого прокси между “всеми объектами” брокера и только теми что нам нужны, становится более понятно.

Итак, в брокере у нас 2 поля – список подписчиков (именно список, а не HashSet, как бы этого не хотелось) и ReaderWriterLockSlim на случай если брокер используется из нескольких потоков.

class EventBroker : IObservable<EventArgs>
{
  private readonly List<Subscription> subscribers = new List<Subscription>();
  private readonly ReaderWriterLockSlim myLock = new ReaderWriterLockSlim();
  ...
}

У брокера три метода. Первый – Subscribe() позволяет кому угодно после массовых Linq-манипуляций таки подписаться на push-коллекцию событий, исходящих от этого брокера. Подписка требует проверки уже существующих подписок, что вносит некую неразбериху в и без того сложный код:

public IDisposable Subscribe(IObserver<EventArgs> subscriber)
{
  Subscription sub = new Subscription(this, subscriber);
  myLock.EnterUpgradeableReadLock();
  try
  {
    if (!subscribers.Any(s => s.Subscriber == subscriber))
    {
      myLock.EnterWriteLock();
      try
      {
        subscribers.Add(sub);
      }
      finally {
        myLock.ExitWriteLock();
      }
    }
  } finally
  {
    myLock.ExitUpgradeableReadLock();
  }
  return sub;
}

Как видите, возвратное значение, которое продиктовано интерфейсом IObservable<T> имеет тип IDisposable – вот почему нам пришлось делать отдельный Subscription. Суть этого простая – если от события (а точнее потока событий) нужно отписаться, можно просто сделать Dispose()

Вот как выглядит “отписка”:

public void Unsubscribe(IObserver<EventArgs> subscriber)
{
  myLock.EnterWriteLock();
  try
  {
    subscribers.RemoveAll(s => s.Subscriber == subscriber);
  }
  finally
  {
    myLock.ExitWriteLock();
  }
}

Тут нет ничего необычного. Этот метод можно вызывать самому, но его также вызывает Subscription в момент удаления. Кому как удобней.

Ну и наконец метод публикации. Тут тоже нет ничего необычного:

public void Publish<T>(T args) where T : EventArgs
{
  myLock.EnterReadLock();
  try
  {
    foreach (var s in subscribers)
      s.Subscriber.OnNext(args);
  }
  finally { myLock.ExitReadLock(); }
}

Игрок

Игрок у нас продолжает держать ссылку на брокер. Там где надо, он использует метод EventBroker.Publish для того чтобы известить всех кто подписан, что он забил мяч:

class FootballPlayer
{
  public string Name { get; set; }
  [Dependency]
  public EventBroker EventBroker { get; set; }
  public void Score()
  {
    Console.WriteLine("{0} scored!!!", Name);
    EventBroker.Publish(new GenericEventArgs(this, Name));
  }
}

Тренер

Тренер оформляет подписки через брокер у себя в конструкторе. При этом мы, обнаглев, можем использовать Linq. Например, можно смело взять и отфильтровать только те типы событий, которые имеют тип GenericEventArgs:

class FootballCoach
{
  private readonly EventBroker broker;
  public FootballCoach(EventBroker broker)
  {
    broker.OfType<GenericEventArgs>().Subscribe(args => 
      Console.WriteLine("Well done, {0}!", args.Data));
  }
}

Для полноты картины вот он, этот тип:

class GenericEventArgs : EventArgs
{
  public GenericEventArgs(object sender, string data)
  {
    Sender = sender;
    Data = data;
  }
  public object Sender { get; set; }
  public string Data { get; set; }
}

А дальше, собственно, все как обычно – то же подключение через конейнер:

var uc = new UnityContainer();
uc.RegisterType<EventBroker>(new ContainerControlledLifetimeManager());
var p = uc.Resolve<FootballPlayer>();
p.Name = "Arshavin";
var c = uc.Resolve<FootballCoach>();
p.Score();

Зачем это надо?

Кому-то может показаться, что мы поменяли шило на мыло – точно так же как и в предыдущем посте, нам приходится пробрасывать брокер прямо в сущности. Даже если бы это был не сам брокер а скажем некий ISubject<T> (это такой интерфейс в Rx для классов которые и подписываются и публикуют), все равно это не очень опрятно.

С другой стороны, мы получили одно достаточно серьезное приемущество – возможность использовать Linq-комбинаторы для сложных преобразований. Например, если тренер начинает замечать игрока только с 3го забитого мяча, это можно описать вот так:

broker.OfType<GenericEventArgs>().Skip(2).Take(5).Subscribe(args =>
  Console.WriteLine("Well done, {0}!", args.Data));

Во время тестирования кода я налетел на небольшой казус: вызов OnNext() при публикации в случае если это последний элемент и подписчик хочет отписаться автоматически ведет за собой вызов Unsubscribe() из подписки и последующая попытка получить write lock с уже полученным read lock. К счастью, эта проблема просто решается с помощью базовых конструктов из PFX.

Заключение

Написать нормальный брокер оказалось не так уж и просто. В следующем посте мы продолжим смотреть на различные брокеры, а пока советую помнить фразу “no silver bullet”, которая очень аккуратно описывает нашу ситуацию.

8 responses to “Брокеры событий, часть 2”

  1. -Событие в пределах брокера характеризуется типом?
    -Почему все-таки не HashSet?
    -Сериализации уведомлений можно было бы избежат копированием subscribers в пределах блокировки чтения и вызовом OnNext вне ее.

    1. – Событие – да, типом который от EventArgs наследует. Это конечно механика, можно любой классификатор воткнуть, но так проще.

      – Потому что тогда нужно реализовывать IEquatable на Subscription, а я поленился.

      – Да, до этого я итак додумался. :)

  2. Не совсем понятно, откуда у EventBroker broker взялся экстеншн OfType()?

    А так же:
    public void Publish(T args) where T : EventArgs
    будет ли аналогом
    public void Publish(EventArgs args)
    Ведь в теле метода у нас T не используется

    Ещё:
    Subscribe(args => Console.WriteLine(“Well done, {0}!”, args.Data));
    Но при этом Subscribe(IObserver subscriber), т.е. метод принимает объект типа IObserver, а не Func или Expression. Как так?

    1. Блог сжирает символы треугольных скобок, омг %)

    2. Использование T или object равноценно с одной лишь поправкой что на T можно поставить ограничитель. Хотя полиморфность тоже подойдет.

      Что касается Subscribe, то тут нужно вникать в суть Rx. Мораль в том что _наш_ брокер удовлетворяет IObservable, но когда мы используем Linq, мы получаем кучу перегрузок, в т.ч. ту которая берет OnNext :)

      1. 1. Ну я про конкретно эту реализацию и спрашивал, ага. Не генерик-вариант получается немного лаконичнее.

        2. А, pixie dust ))

        Ок, мерси

  3. Дмитрий Avatar
    Дмитрий

    Буквально пара копеек :)

    1. Если речь идет про 4-й фреймворк, то есть смысл посмтореть на System.Collections.Concurrent.ConcurrentBag и ко

    2. Эта реализация брокера может завалить систему:
    2.1. он один на весь контейнер, и если контейнер один на апликуху, то блокировки на подписке положат всё
    2.2. зачем тренеру знать о событиях йух пойми каких игроков

    З.Ы. Хочу предложить тему для альтнет –
    http://parallelpatterns.codeplex.com/
    http://code.msdn.microsoft.com/ParExtSamples

    1. 1. Да, в принципе что-то подобное и надо использовать. Или менять в корне модель использования.

      2.1 Согласен

      2.2 Ему действительно нужно знать. К тому же, это всего лишь пример.

      Тему для подкастов примем к рассмотрению. Спасибо.

Оставить комментарий