Publisher/Subscriber Implementation with WCF

The other day I was looking for a simple implementation of the Publisher/Subscriber design using WCF.

Well, I did found some samples (here and here) but none of them seemed to work correctly on my deployment. After looking for more info in Juval Lowy's Programming WCF Services book, I ended up implementing my own design from scratch.

Thought I would share it here for future reference.

The implementation consists - as expected - of 3 components:

  • The service (PublisherSubscriber2.Service)
  • The client (PublisherSubscriber2.Client)
  • The publisher (PublisherSubscriber2.Publisher)

Service

The service provides 3 simple operations: Subscribe and Unsubscribe so that the client can subscribe events, and PublishEvent so the publisher can do its job of publishing events.

The contract is as follows:

   1: using System;
   2: using System.ServiceModel;
   3:  
   4: namespace PublisherSubscriber2.Service
   5: {
   6:     /// <summary>
   7:     /// PubSub service contract.
   8:     /// </summary>
   9:     [ServiceContract(SessionMode = SessionMode.Required, CallbackContract = typeof(IPubSubCallback))]
  10:     public interface IPubSubService
  11:     {
  12:         /// <summary>
  13:         /// Operation used by the subscriber to subscribe to events published.
  14:         /// </summary>
  15:         [OperationContract(IsOneWay = false, IsInitiating = true)]
  16:         void Subscribe();
  17:  
  18:         /// <summary>
  19:         /// Operation used by the subscriber to unsubscribe to events published (stop receiving subscriptions).
  20:         /// </summary>
  21:         [OperationContract(IsOneWay = false, IsInitiating = false, IsTerminating = true)]
  22:         void Unsubscribe();
  23:  
  24:         /// <summary>
  25:         /// Operation used by the publisher to publish events.
  26:         /// </summary>
  27:         /// <param name="eventName">String that identifies the event published.</param>
  28:         [OperationContract(IsOneWay = true)]
  29:         void PublishEvent(string eventName);
  30:     }
  31: }

This service contract defines a callback contract that will be used to notify clients when new events are published. That contract is also pretty simple:

   1: using System;
   2: using System.ServiceModel;
   3:  
   4: namespace PublisherSubscriber2.Service
   5: {
   6:     /// <summary>
   7:     /// Defines the callback contract used by the IPubSubService.
   8:     /// </summary>
   9:     public interface IPubSubCallback
  10:     {
  11:         /// <summary>
  12:         /// Called when a new event is published in the PubSub service.
  13:         /// </summary>
  14:         /// <param name="eventName">String that identifies the event published.</param>
  15:         [OperationContract]
  16:         void EventPublished(string eventName);
  17:     }
  18: }

The implementation of this service is where all the magic happens. The samples I mentioned before rely on delegates and events to implement the notifications. My implementation, on the other hand, uses a static list to hold all the client callbacks.

First, the service implementation. It just forwards calls to the service adapter. Notice the ServiceBehavior defined.

   1: using System;
   2: using System.ServiceModel;
   3: using System.Collections.Generic;
   4:  
   5: namespace PublisherSubscriber2.Service
   6: {
   7:     /// <summary>
   8:     /// PubSub service implementation.
   9:     /// </summary>
  10:     [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession)]
  11:     public class PubSubService : IPubSubService
  12:     {
  13:         #region Constructors
  14:  
  15:         /// <summary>
  16:         /// Creates a new instance of the <see cref="PubSubService"/> class.
  17:         /// </summary>
  18:         public PubSubService()
  19:         {
  20:         }
  21:  
  22:         #endregion
  23:  
  24:         #region Public Methods
  25:  
  26:         /// <summary>
  27:         /// Operation used by the subscriber to subscribe to events published.
  28:         /// </summary>
  29:         public void Subscribe()
  30:         {
  31:             PubSubServiceAdapter adapter = new PubSubServiceAdapter();
  32:             adapter.Subscribe();
  33:         }
  34:  
  35:         /// <summary>
  36:         /// Operation used by the subscriber to unsubscribe to events published (stop receiving subscriptions).
  37:         /// </summary>
  38:         public void Unsubscribe()
  39:         {
  40:             PubSubServiceAdapter adapter = new PubSubServiceAdapter();
  41:             adapter.Unsubscribe();
  42:         }
  43:  
  44:         /// <summary>
  45:         /// Operation used by the publisher to publish events.
  46:         /// </summary>
  47:         /// <param name="eventName">String that identifies the event published.</param>
  48:         public void PublishEvent(string eventName)
  49:         {
  50:             PubSubServiceAdapter adapter = new PubSubServiceAdapter();
  51:             adapter.PublishEvent(eventName);
  52:         }
  53:  
  54:         #endregion
  55:     }
  56: }

Now, the service adapter:

   1: using System;
   2: using System.ServiceModel;
   3: using System.Collections.Generic;
   4: using System.Collections.ObjectModel;
   5: using System.Collections;
   6:  
   7: namespace PublisherSubscriber2.Service
   8: {
   9:     /// <summary>
  10:     /// PubSub service adapter.
  11:     /// </summary>
  12:     public class PubSubServiceAdapter : IPubSubService
  13:     {
  14:         #region Members
  15:  
  16:         private static List<IPubSubCallback> callbacks = new List<IPubSubCallback>();
  17:  
  18:         #endregion
  19:  
  20:         #region Public Methods
  21:  
  22:         /// <summary>
  23:         /// Operation used by the subscriber to subscribe to events published.
  24:         /// </summary>
  25:         public void Subscribe()
  26:         {
  27:             // Get callback contract
  28:  
  29:             IPubSubCallback callback = OperationContext.Current.GetCallbackChannel<IPubSubCallback>();
  30:  
  31:             // Add the subscriber callback to the list of active subscribers
  32:  
  33:             if (!callbacks.Contains(callback))
  34:             {
  35:                 callbacks.Add(callback);
  36:             }
  37:         }
  38:  
  39:         /// <summary>
  40:         /// Operation used by the subscriber to unsubscribe to events published (stop receiving subscriptions).
  41:         /// </summary>
  42:         public void Unsubscribe()
  43:         {
  44:             // Get callback contract
  45:             
  46:             IPubSubCallback callback = OperationContext.Current.GetCallbackChannel<IPubSubCallback>();
  47:  
  48:             // Remove the subscriber callback from the list of active subscribers
  49:  
  50:             if (callbacks.Contains(callback))
  51:             {
  52:                 callbacks.Remove(callback);
  53:             }
  54:         }
  55:  
  56:         /// <summary>
  57:         /// Operation used by the publisher to publish events.
  58:         /// </summary>
  59:         /// <param name="eventName">String that identifies the event published.</param>
  60:         public void PublishEvent(string eventName)
  61:         {
  62:             // Publish event to all active subscribers.
  63:  
  64:             CallSubscribers(eventName);
  65:         }
  66:  
  67:         #endregion
  68:  
  69:         #region Private Methods
  70:  
  71:         /// <summary>
  72:         /// Publishes the specified event to all active subscribers.
  73:         /// </summary>
  74:         /// <param name="eventName">Name of the event.</param>
  75:         private static void CallSubscribers(string eventName)
  76:         {
  77:             // Build callback action
  78:  
  79:             Action<IPubSubCallback> invoke = delegate(IPubSubCallback callback)
  80:             {
  81:                 callback.EventPublished(eventName);
  82:             };
  83:  
  84:             // Call callback for every registered subscriber.
  85:  
  86:             callbacks.ForEach(invoke);
  87:         }
  88:  
  89:         #endregion
  90:     }
  91: }
Here, the trick is done with the static callbacks list and in the CallSubscribers method. Pretty simple.

Client

My client is a simple console app just to help test the service. It has a reference to the service and implements the callback as expected.

   1: using System;
   2: using System.ServiceModel;
   3:  
   4: namespace PubliserService2.Client
   5: {
   6:     class Program
   7:     {
   8:         static void Main(string[] args)
   9:         {
  10:             using (PubSubService.PubSubServiceClient client = 
  11:                new PubSubService.PubSubServiceClient(new InstanceContext(new PubSubServiceCallback())))
  12:             {
  13:                 Console.WriteLine("Client started.");
  14:                 Console.WriteLine();
  15:  
  16:                 client.Subscribe();
  17:                 Console.WriteLine("Subscription completed.");
  18:                 Console.WriteLine();
  19:  
  20:                 Console.WriteLine("Press any key to close this client.");
  21:                 Console.WriteLine();
  22:                 Console.ReadKey();
  23:  
  24:                 client.Unsubscribe();
  25:                 Console.WriteLine("Unsubscription completed.");
  26:                 Console.WriteLine();
  27:  
  28:             }
  29:  
  30:             Console.WriteLine("Connection closed.");
  31:             Console.WriteLine();
  32:  
  33:             Console.WriteLine("Press any key to shutdown this client.");
  34:             Console.ReadKey();
  35:         }
  36:     }
  37: }
   1: using System;
   2:  
   3: namespace PubliserService2.Client
   4: {
   5:     public class PubSubServiceCallback : PubSubService.IPubSubServiceCallback
   6:     {
   7:         #region Public Methods
   8:  
   9:         public void EventPublished(string eventName)
  10:         {
  11:             string description = string.Format("Event received: {0}", eventName);
  12:             Console.WriteLine(description);
  13:         }
  14:  
  15:         #endregion
  16:     }
  17: }

It's worth mentioning here that this implementation of the callback contract doesn't reuse the service again. If that was required - for example to call another operation and obtain data for the event received - it would be necessary to define the CallbackBehavior in this class:

   1: [CallbackBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]

Publisher

The publisher is just a straightforward client of the service:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Text;
   4: using System.ServiceModel;
   5:  
   6: namespace PublisherSubscriber2.Publisher
   7: {
   8:     class Program
   9:     {
  10:         static void Main(string[] args)
  11:         {
  12:             using (PubSubService.PubSubServiceClient client =