Дмитpий Hecтepук

Блог о программировании — C#, F#, C++, архитектура, и многое другое

NServiceBus для data acquisition

20 комментариев

В последнее время я написал несколько постов на близкие мне темы, такие как web mining (практика вытаскивания данных из интернета) и event brokers (они же брокеры событий, или аггрегаторы, как кому понятней). Это, если хотите, слой бизнес-логики. В этом же посте я хочу рассказать о том, как реализована интеграция различных подсистем, которые участвуют в получении и обработке данных. Я буду использовать фреймворк NServiceBus в качестве messaging-шины. Примеры кода можно найти тут: http://bitbucket.org/nesteruk/datagatheringdemos.

Что такое ESB?

Для начала давайте определимся, зачем нужна эта передача сообщений. Все наши системы так или иначе являются отделенными (decoupled) друг от друга. Например, одна система занимается скачиванием данных. Другая занимается обработкой этих данных – так мы получаем возможность одни и те же данные пускать под разные цели. Еще одна система занимается хранением этих данных и предоставлением доступа к ним. Ну и наконец у нас есть программы которые показывают эти данные на экране.

Существует море вариантов для взаимодействия этих систем, и обычно для таких сценариев применяется нечто под названием Enterprise Service Bus (ESB), т.е. шина для корпоративных сервисов. ESB – это инфраструктурное решение, которое позволит вашим сервисам и приложениям работать друг с другом по одному стандартизованному методу. В случае с библиотекой NServiceBus, которую я предлагаю использовать, используется достаточно древняя технология MSMQ.

Сценарий

Сразу строить сложную архитектуру сложно, поэтому представим что у нас есть две вещи – сервис который получает откуда-то данные (например “сдирает” их с сайтов), и сервис который получает эти данные и записывает их в базу. Представим себе что обработки эти данные не требуют и что их можно записывать в базу “в чем мать родила”. Внимание вопрос: как первый сервис будет передавать данные второму?

Именно для этого нам и пригодится NServiceBus. NSB позволит первому сервису отслылать сообщения о новых данных в общую очередь, из которой их будет брать второй сервис и уже что-то делать с этими данными (например, записывать их в базу). В нашем сценарии, мы представим что первый сервис будет получать информацию о человеке (Person) и передавать ее второму.

Структура и зависимости

Итак, с чего же начать? Во-первых, нужно скачать NServiceBus – это всего лишь библиотечка (DLL). Другие библиотеки нам не нужны, так что можно начать думать о том, как организовать проекты. Нам нужно как минимум три проекта:

  • Проект для отсылки данных по шине
  • Проект для получиния данных по шине
  • Проект для общих структур, таких как Person

В отличии от, например, WCF, мы можем позволить себе использовать обычный POCO-объект… хехе, ну не совсем, точнее “совсем не POCO” :( Этот объект пройдет по шине MSMQ и окажется в очереди, из которой мы будем его забирать. Вот собственно наш Person:

[Serializable]
public class Person : IMessage
{
  public string Name { get; set; }
  public int Age { get; set; }
}

Ну ладно, я в принципе готов признать что по сути дела тот кто реализует IMessage тот не POCO, а мы просто в нашем случае посылаем Person без контейнера. Фактически, мои претензии связяны с тем, что мне лень делать некий класс UpsertPersonCommand который бы аггрегировал Person. Я в принципе не против CQRS как такового, просто в моем случае все проще.

Сервисы отправления и получения чуть более сложны в реализации, давайте посмотрим на них.

Сервис выcылки данных

Окей, есть у нас класс Person который является сообщением т.к. “реализует” пустой интерфейс IMessage. Сам сарвис высылки данных – это консольное приложение, в котором две важные части – конфигурация NServiceBus в XML и конфигурация его же в коде. Запутались? Я тоже.

Происходит все примерно так. Сначала мы в XML-файле конфигурируем те очереди MSMQ которые используются для высылки данных и для подписок. Например:

<MsmqTransportConfig
  InputQueue="activemesa.test.input"
  ErrorQueue="activemesa.error"
  NumberOfWorkerThreads="1"
  MaxRetries="1"
/>
<MsmqSubscriptionStorageConfig Queue="activemesa.subscriptions" />

Конфигурация выше означает что наша программулина будет публиковать сообщение в очередь activemesa.test.input, а подписки будут в activemesa.subscriptions. Тут стоит пояснить – pub/sub в NServiceBus работает с помощью подписок, которые хранятся либо в MSMQ очереди (как у нас) либо в базе данных SqLine (вот откуда тут NHibernate, кстати).

Теперь конфигурация в коде – это та часть где мы описываем какие услуги мы хотим от NServiceBus. Это fluent interface с горой настроек, и штука это весьма капризная. Вот как выглядят настройки для примитивнейшего сервиса:

public class Program
{
  public IBus Bus { get; set; }
  public ILog Log = LogManager.GetLogger(typeof (Program));
  void SendSomething()
  {
    Bus = Configure.With()
      .DefaultBuilder()
      .Log4Net()
      .XmlSerializer()
      .MsmqSubscriptionStorage()
      .MsmqTransport()
        .IsTransactional(true)
        .PurgeOnStartup(false)
      .UnicastBus()
        .ImpersonateSender(true)
      .CreateBus()
      .Start();
  }
}

Используя DI (в нашем случае – ненавистный Spring), NServiceBus инициализирует нам логгер и собственно саму шину (Bus). Опубликовать что-то на шине – проще простого.

var p = new Person();
Random r = new Random();
p.Name = new string(Enumerable.Range(0, r.Next(5, 10)).
  Select(i => (char)('a' + r.Next(26))).ToArray());
p.Age = r.Next(20, 80);
Bus.Publish(p);

Внимание: прежде чем что-то публиковать, нужно чтобы кто-то на это подписался. Если вы начнете публиковать без подписок (см. очередь activemesa.subscriptions), все ваши сообщения пропадут.

Слушатель

В принципе, слушатели в NServiceBus имеют свой хост под названием NServiceBus.Host.Exe – я люблю обобщенные хосты, но если учесть что свой собственный хостинг более гибкая штука, хочу показать именно его. И то и то решение занимает около 3х минут, так что особой разницы нет, разве что я на практике люблю использовать MEF чтобы была хорошая pluggable architecture. Впрочем, обобщенный хост с MEF я уже показывал, обойдемся консолью.

Итак, прежде всего, у слушателей другая XML-конфигурация:

<UnicastBusConfig>
  <MessageEndpointMappings>
    <add Messages="SimpleMessagingCommon" Endpoint="activemesa.test.input"/>
  </MessageEndpointMappings>
</UnicastBusConfig>
<MsmqSubscriptionStorageConfig Queue="activemesa.subscriptions" />

Мораль в том что слушателю надо указать с какой очереди слушать сообщения, ну и где подписываться на все это дело.

Теперь конфигурация… ее в нашем случае мы поместим в Main(). Она почти такая же как и для публикатора, с одним отличием:

Bus = Configure.With()
  .DefaultBuilder()
  .Log4Net()
  .XmlSerializer()
  .MsmqSubscriptionStorage()
  .MsmqTransport()
    .IsTransactional(false)
  .UnicastBus()
    .ImpersonateSender(false)
    .LoadMessageHandlers()
  .CreateBus()
  .Start();

Важная строчка тут LoadMessageHandlers() которая загрузит из вашей сборки все “слушатели”. Если ее пропустить, никто не подпишется на сообщения. Теперь перейдем собственно к слушателям – каждый слушатель реализует интерфейс IMessageHandler<T>, где T – тип сообщения. Соответственно наш слушатель сообщений Person выглядит вот так:

public class PersonMessageHandler : IMessageHandler<Person>
{
  public void Handle(Person message)
  {
    Program.Log.InfoFormat("Got a message with {0}", message);
  }
}

Мне кажется, более простого API не существует в природе. На этом этапе можно например взять Person и сделать upsert в базу, а можно например обработать/трансформировать и только потом передать дальше по шине на сериализацию.

Вот собственно и все – можно запускать оба приложения (см. примеры на BitBucket) и одно будет передавать сообщения другому

Сценарии использования

Итак, я коротко показал насколько просто передвигать данные из одного приложения в другое. На самом деле, в качестве публикатора часто выступает сервис который периодически вытаскивает данные отдуда-либо, в то время как на принимающем конце часто сидят две вещи – сервис который сериализует данные в базу, или программа которая визуально показывает изменения в то время как они происходят. У меня это конечно же MdxConsole.

Я привел примеры использования NServiceBus pub/sub на одной машине, что сравнительно легко по сравнению с реальными сценариями, в которых может быть несколько машин (как реальных так и виртуальных) которые находятся за всякими AD и прочими. При серьезном использовании NServiceBus начинаются проблемы с файерволлами, с поддержкой в XP (если вы все еще используете эту систему), а также с AD, VPN и прочими инфрастуктурными барьерами.

Напоследок, хочу добавить что на практике порой поверх той инфраструктуры что предлагает NSB приходться строить “песочные замки” дабы реализовать такое поведение как ручное перекладывание сообщений их одной очереди в другую, самостаятельная их обработка через System.Messaging и много других неприятностей. Но вот для простых задач NServiceBus работает очень хорошо, не требует слишком много умственной активности (правда иногда возникают просто невменяемые проблемы), и вообще… NSB себя уже тысячу раз доказал в более сложных проектах чем мои, так что я продолжаю им пользоваться и вам рекомендую.

Advertisements

Written by Dmitri

12 июля 2010 в 16:15

Опубликовано в .NET, NServiceBus

комментариев 20

Subscribe to comments with RSS.

  1. Дмитрий, спасибо за статью!

    У меня к Вам вопрос — почему Вы не исользуете WCF — там же проще простого создать такой обмен сообщениями.

    И еще, предложение «В отличии от, например, WCF, мы можем позволить себе использовать обычный POCO-объект» неверно, WCF позволяет использовать POCO-объекты.

    Также, вот статья в тему WCF&MSMQ http://code.msdn.microsoft.com/msmqpluswcf

    Иван

    12 июля 2010 at 18:06

    • Ну про POCO я имел ввиду что я не хочу использовать никакие атрибуты или интерфейсы для класса-сущности. Судя по всему без этого никак ни в NSB ни в WCF.

      Dmitri

      12 июля 2010 at 18:19

      • Ошибаетесь. Начиная с .NET 3.5 SP1 поддерживается работа с POCO-объектами без каких-либо атрибутов.

        Иван

        12 июля 2010 at 23:06

  2. Поясните плиз
    >Другая занимается обработкой этих данных – так мы получаем возможность одни и те же данные пускать под разные цели.

    И вобще, не будет проще и эффективнее, если сборшик будет сам же и записивать то, что он собрал в базу? Если, нужно, можно же просто рассылать другим сервисам уведомления о том, что пришли новые данные.

    tracker

    12 июля 2010 at 18:58

    • Попробую пояснить. Вот возьмем процесс сборки данных из сайтов. Этот процесс — по сути дела ETL (я это упомянул вот тут), т.е. 3 этапа — получить данные, обработать их и сохранить. Если все происходит на одном компьютере, тогда все ок — у меня есть решения которые так и работают — generic WatiN host получает тот или иной сайт, плагин его трансформирует на базе подготовленных XSLT, результат пишется в базу. Все ок.

      Проблема в том, что во-первых не всегда все элементы colocated, то есть например database сервер отделен и нужно ему мессаджить как-то. Понимаю что есть варианты и с WCF и с SOAP, но NSB — это просто и эффективно. Следующая проблема — иногда нужно заменять одну из этих частей, при этом не выключая другую — например захотелось мне тут недавно мигрировать часть персистенса с SQL Server на MongoDB — в это время продолжает работать и серсис записи SQL Server и впаралель я могу развернуть Generic NSB Host для сериализации в MongoDB.

      Если коротко — с физическим разделением архитипичной «трехзвенки» приходит очень большая гибкость, которая весьма полезна в требовательных enterprise-решениях.

      Dmitri

      12 июля 2010 at 19:11

      • > Проблема в том, что во-первых не всегда все элементы colocated, то есть например database сервер отделен и нужно ему мессаджить как-то. Понимаю что есть варианты и с WCF и с SOAP, но NSB – это просто и эффективно
        Ну тут понятно, хотя, по мне, отказ от WCF выглядит не слишком мотивировано.

        >например захотелось мне тут недавно мигрировать часть персистенса с SQL Server на MongoDB – в это время продолжает работать и серсис записи SQL Server и впаралель я могу развернуть Generic NSB Host для сериализации в MongoDB.
        Но ведь для переключения на новый сервис записи сборщик все равно придется перезаппускать? Получается тоже самое, как если бы мы собрали новую версию сборщика, с поддержкой новой базы данных.

        Мое мнение — что тут бы просто хватило связать севисы с помощью WCF, но попробовать что-то новое всегда здорово ;)

        Мне кажется, что осовное преимущество использования того же MSMQ в том, что если распределенное приложение развернуто в неоднородной среде и сосотоит из значительного количества сервисов, то с помощью QS его легче конйигурировать. Тоесть не надо для каждого сервиса прописывать физческе адреса соседей, достаточно настроить доступ к QS. Други преимуществ я не нашел, буду рад, если укажете на них.

        tracker

        12 июля 2010 at 20:07

        • Проблема в том, что во-первых не всегда все элементы colocated, то есть например database сервер отделен и нужно ему мессаджить как-то. Понимаю что есть варианты и с WCF и с SOAP, но NSB – это просто и эффективно
          Ну тут понятно, хотя, по мне, отказ от WCF выглядит не слишком мотивировано.

          Насколько я понимаю, тот же pub-sub в WCF мы не получаем бесплатно, а нужно писать самому. Даже есть учесть что все уже по 100 раз сделано, все равно полезно иметь готовое и протестированое.

          например захотелось мне тут недавно мигрировать часть персистенса с SQL Server на MongoDB – в это время продолжает работать и серсис записи SQL Server и впаралель я могу развернуть Generic NSB Host для сериализации в MongoDB.
          Но ведь для переключения на новый сервис записи сборщик все равно придется перезаппускать? Получается тоже самое, как если бы мы собрали новую версию сборщика, с поддержкой новой базы данных.

          Нет, сборщик-то точно не придется. Он как публиковал в одну очередь так и публикует. Другое дело что теперь из этой очереди «забирают» два клиента.

          Мое мнение – что тут бы просто хватило связать севисы с помощью WCF, но попробовать что-то новое всегда здорово ;)

          Я не спорю — с WCF вполне реально. Да и сам NSB умеет вывешивать WCF-эндпоинты, если на то пошло.

          Мне кажется, что осовное преимущество использования того же MSMQ в том, что если распределенное приложение развернуто в неоднородной среде и сосотоит из значительного количества сервисов, то с помощью QS его легче конйигурировать. Тоесть не надо для каждого сервиса прописывать физческе адреса соседей, достаточно настроить доступ к QS. Други преимуществ я не нашел, буду рад, если укажете на них.

          Конфигурировать NSB действительно намного проще чем WCF, что собственно неудивительно если учесть что WCF это очень обобщенный фреймворк. А еще количество проблем с аутентификацией в WCF (если голыми руками делать) просто удручает, в то время как NSB хоть что-то пытается делать… хотя у меня наверное однобокий опыт.

          Dmitri

          12 июля 2010 at 20:34

  3. Я извиняюсь за оффтоп, но эти трехбуквенные сокращения уже начинают надоедать и путать. :) С той ESB, про которую речь идет в статье, я поработал больше года назад. В принципе, идея правильная и главное что — удобная: если я захочу сменить один сервис другим, то мне надо всего лишь в новом сервисе поддержать формат общения.
    А вот недавно начал копаться в другом ESB — External BLOB Storage. Это вообще не касается данной темы, но я уже начинаю путать понятие ESB.
    ЗЫ Вот уж эти сокращения…

    Александр

    13 июля 2010 at 9:15

  4. Хм, а мне кажется, что существование NSB в большей степени обусловлено тем, что она разрабатывалась тогда, когда никакого WCF еще не было. Целесообразность использования NSB сейчас действительно неочевидна. Да, в WCF стремились к обобщению на сколько это было возможно, и это очень клево. Могучие фичи WCF, доступные для любого транспорта, вполне бесплатны. Рантайм расширяем. В случае с NSB шаг в сторону будет означать куда большее погружение в велосипедостроение.

    ЗЫ В WCF 4 простой Pub/Sub делается без кода вообще (привет роутинг).

    Pavel Fedulov

    13 июля 2010 at 14:00

    • > В WCF 4 простой Pub/Sub делается без кода вообще (привет роутинг).
      Интересно, ткните в пример, если не сложно.

      tracker

      13 июля 2010 at 18:31

  5. […] o Использование NServiceBus для получения и обработки данных […]

  6. знаю — оффтоп!
    Но вся тема NSB это MSMQ, который отказо-устойчовый по определению. В этом и есть суть ESB+SOA. Все остальные варианты — зависимы от работоспасобности системы.
    В MSMQ нет ожидания ответа как в WS или WCF. Кинул мессаже и свободен. Даже one-way в WCF не сидит рядом, т.к. появляется зависимость между передающей системой и получающий.А это на секундочку противоречет понятиям SOA.

    Bill Guest

    16 октября 2010 at 23:56

  7. […] Использование NServiceBus для получения и обработки данных […]

  8. Спасибо за интересную статью. Такой вопрос непонятен. Допустим приложение содержащее шину было перезапущено и все подписки утеряны. Как можно восстановить подписки в таком случае?

    Alex

    23 июля 2012 at 10:20

  9. что такое «upsert в базу»?

    Наиль

    14 октября 2012 at 21:45

    • Upsert = update or insert. То есть вставить запись если ее нету, или обновить если есть. В некоторых DBMS (например SQL SERVER) есть для этого специальные операции (например MERGE DML).

      Dmitri

      14 октября 2012 at 21:46

  10. Стоит ли использовать NSB для масштабирования высоконагруженных приложений?

    Наиль

    14 октября 2012 at 21:46

    • Как шину для передачи сообщений — определенно стоит, хотя есть сценарии когда MSMQ не совсем подходит.

      Dmitri

      14 октября 2012 at 21:47

  11. Дмитрий, здравствуйте!
    Что по вашему нужно делать, когда есть около десяти WCF-сервисов, между которыми много связей (не каждый с каждым конечно, но много)? Сделать один класс, в который мэппить (в отдельном сервисе) все входные и выходные контракты общающихся сервисов? Упростит ли NServiceBus организацию взаимодействия между сервисами? На данный момент не знаком с NSB. Еще наткнулся на .net решение MassTransit. Не знакомы с ним?

    Ну и последний оффтоп-вопрос=) Почему Spring ненавистен?=)

    Игорь Г.

    26 февраля 2013 at 13:00

    • Event bus конечно может упростить, и неважно какой. С Mass Transit не работал, увы. А Spring ненавистен только на .NET где и без него все хорошо, а в Java стеке это вроде как must have.

      Dmitri

      26 февраля 2013 at 15:22


Оставить комментарий

Заполните поля или щелкните по значку, чтобы оставить свой комментарий:

Логотип WordPress.com

Для комментария используется ваша учётная запись WordPress.com. Выход / Изменить )

Фотография Twitter

Для комментария используется ваша учётная запись Twitter. Выход / Изменить )

Фотография Facebook

Для комментария используется ваша учётная запись Facebook. Выход / Изменить )

Google+ photo

Для комментария используется ваша учётная запись Google+. Выход / Изменить )

Connecting to %s

%d такие блоггеры, как: