@@ -114,6 +114,7 @@ public Subscription(Subscription template, bool copyEventHandlers)
114114 m_publishStatusChanged = template . m_publishStatusChanged ;
115115 m_fastDataChangeCallback = template . m_fastDataChangeCallback ;
116116 m_fastEventCallback = template . m_fastEventCallback ;
117+ m_fastKeepAliveCallback = template . m_fastKeepAliveCallback ;
117118 }
118119
119120 // copy the list of monitored items.
@@ -454,6 +455,19 @@ public FastEventNotificationEventHandler FastEventCallback
454455 set => m_fastEventCallback = value ;
455456 }
456457
458+ /// <summary>
459+ /// Gets or sets the fast keep alive callback.
460+ /// </summary>
461+ /// <value>The keep alive change callback.</value>
462+ /// <remarks>
463+ /// Only one callback is allowed at a time but it is more efficient to call than an event.
464+ /// </remarks>
465+ public FastKeepAliveNotificationEventHandler FastKeepAliveCallback
466+ {
467+ get => m_fastKeepAliveCallback ;
468+ set => m_fastKeepAliveCallback = value ;
469+ }
470+
457471 /// <summary>
458472 /// The items to monitor.
459473 /// </summary>
@@ -1903,15 +1917,28 @@ private async Task OnMessageReceived()
19031917 // get list of new messages to process.
19041918 List < NotificationMessage > messagesToProcess = null ;
19051919
1920+ // get list of new messages to process.
1921+ List < IncomingMessage > keepAliveToProcess = null ;
1922+
19061923 // get list of new messages to republish.
19071924 List < IncomingMessage > messagesToRepublish = null ;
19081925
19091926 lock ( m_cache )
19101927 {
19111928 for ( LinkedListNode < IncomingMessage > ii = m_incomingMessages . First ; ii != null ; ii = ii . Next )
19121929 {
1930+ // process keep alive messages
1931+ if ( ii . Value . Message == null && ! ii . Value . Processed )
1932+ {
1933+ if ( keepAliveToProcess == null )
1934+ {
1935+ keepAliveToProcess = new List < IncomingMessage > ( ) ;
1936+ }
1937+ keepAliveToProcess . Add ( ii . Value ) ;
1938+ }
1939+
19131940 // update monitored items with unprocessed messages.
1914- if ( ii . Value . Message != null && ! ii . Value . Processed &&
1941+ else if ( ii . Value . Message != null && ! ii . Value . Processed &&
19151942 // If sequential publishing is enabled, only release messages in perfect sequence.
19161943 ( ! m_sequentialPublishing || ii . Value . SequenceNumber <= m_lastSequenceNumberProcessed + 1 ) )
19171944 {
@@ -1971,25 +1998,41 @@ private async Task OnMessageReceived()
19711998 }
19721999 }
19732000
2001+ // process new keep alive messages.
2002+ FastKeepAliveNotificationEventHandler keepAliveCallback = m_fastKeepAliveCallback ;
2003+ if ( keepAliveToProcess != null && keepAliveCallback != null )
2004+ {
2005+ foreach ( IncomingMessage message in keepAliveToProcess )
2006+ {
2007+ var keepAlive = new NotificationData {
2008+ PublishTime = message . Timestamp ,
2009+ SequenceNumber = message . SequenceNumber
2010+ } ;
2011+ keepAliveCallback ( this , keepAlive ) ;
2012+ }
2013+ }
2014+
19742015 // process new messages.
19752016 if ( messagesToProcess != null )
19762017 {
2018+ int noNotificationsReceived ;
19772019 FastDataChangeNotificationEventHandler datachangeCallback = m_fastDataChangeCallback ;
19782020 FastEventNotificationEventHandler eventCallback = m_fastEventCallback ;
1979- int noNotificationsReceived = 0 ;
19802021
1981- for ( int ii = 0 ; ii < messagesToProcess . Count ; ii ++ )
2022+ foreach ( NotificationMessage message in messagesToProcess )
19822023 {
1983- NotificationMessage message = messagesToProcess [ ii ] ;
19842024 noNotificationsReceived = 0 ;
19852025 try
19862026 {
1987- for ( int jj = 0 ; jj < message . NotificationData . Count ; jj ++ )
2027+ foreach ( ExtensionObject notificationData in message . NotificationData )
19882028 {
1989- DataChangeNotification datachange = message . NotificationData [ jj ] . Body as DataChangeNotification ;
2029+ var datachange = notificationData . Body as DataChangeNotification ;
19902030
19912031 if ( datachange != null )
19922032 {
2033+ datachange . PublishTime = message . PublishTime ;
2034+ datachange . SequenceNumber = message . SequenceNumber ;
2035+
19932036 noNotificationsReceived += datachange . MonitoredItems . Count ;
19942037
19952038 if ( ! m_disableMonitoredItemCache )
@@ -2003,10 +2046,13 @@ private async Task OnMessageReceived()
20032046 }
20042047 }
20052048
2006- EventNotificationList events = message . NotificationData [ jj ] . Body as EventNotificationList ;
2049+ var events = notificationData . Body as EventNotificationList ;
20072050
20082051 if ( events != null )
20092052 {
2053+ events . PublishTime = message . PublishTime ;
2054+ events . SequenceNumber = message . SequenceNumber ;
2055+
20102056 noNotificationsReceived += events . Events . Count ;
20112057
20122058 if ( ! m_disableMonitoredItemCache )
@@ -2020,7 +2066,7 @@ private async Task OnMessageReceived()
20202066 }
20212067 }
20222068
2023- StatusChangeNotification statusChanged = message . NotificationData [ jj ] . Body as StatusChangeNotification ;
2069+ StatusChangeNotification statusChanged = notificationData . Body as StatusChangeNotification ;
20242070
20252071 if ( statusChanged != null )
20262072 {
@@ -2476,6 +2522,7 @@ private IncomingMessage FindOrCreateEntry(DateTime utcNow, uint sequenceNumber)
24762522 private bool m_disableMonitoredItemCache ;
24772523 private FastDataChangeNotificationEventHandler m_fastDataChangeCallback ;
24782524 private FastEventNotificationEventHandler m_fastEventCallback ;
2525+ private FastKeepAliveNotificationEventHandler m_fastKeepAliveCallback ;
24792526 private int m_outstandingMessageWorkers ;
24802527 private SemaphoreSlim m_messageWorkersSemaphore ;
24812528 private bool m_sequentialPublishing ;
@@ -2569,6 +2616,11 @@ public enum SubscriptionChangeMask
25692616 /// </summary>
25702617 public delegate void FastEventNotificationEventHandler ( Subscription subscription , EventNotificationList notification , IList < string > stringTable ) ;
25712618
2619+ /// <summary>
2620+ /// The delegate used to receive keep alive notifications via a direct function call instead of a .NET Event.
2621+ /// </summary>
2622+ public delegate void FastKeepAliveNotificationEventHandler ( Subscription subscription , NotificationData notification ) ;
2623+
25722624 #region SubscriptionStateChangedEventArgs Class
25732625 /// <summary>
25742626 /// The event arguments provided when the state of a subscription changes.
0 commit comments