Using Reactive Extensions (Rx) with the Splunk SDK for C# 2.0

With the Splunk SDK for C#, you can use Reactive Extensions for .NET (Rx .NET) to "observe" Splunk's query results and receive notifications from that collection. Rx was built for pushing multiple data streams that you can subscribe to using an observer interface. The Splunk SDK for C# defines an Observable<T> class to provide basic push-based notifications to observers. This implementation of Observable<T> is available without first installing Rx. However, you can implement either the SDK's Observable<T> or an observer defined in Rx.

Compare these two search examples that based on the normal-search example in the SDK. The first is a "pull" model that uses a foreach loop to iterate through the search results, while the second is a "push" model that uses a subscription to observe the collection of search result records:

//// Search : Pull model (foreach loop => IEnumerable)

Job job = await service.Jobs.CreateAsync("search index=_internal | head 10");
SearchResultStream stream;

using (stream = await job.GetSearchResultsAsync())
{
    try
    {
        foreach (SearchResult result in stream)
        {
            Console.WriteLine(string.Format("{0:D8}: {1}", stream.ReadCount, result));
        }
        
        Console.WriteLine("End of search results");
    }
    catch (Exception e)
    {
        Console.WriteLine(string.Format("SearchResults error: {0}", e.Message));
    }
}

//// Search : Push model (by way of subscription to search result records => IObservable)

job = await service.Jobs.CreateAsync("search index=_internal | head 10");

using (stream = await job.GetSearchResultsAsync())
{
    stream.Subscribe(new Observer<SearchResult>(
        onNext: (result) =>
        {
            Console.WriteLine(string.Format("{0:D8}: {1}", stream.ReadCount, result));
        },
        onError: (e) =>
        {
            Console.WriteLine(string.Format("SearchResults error: {0}", e.Message));
        },
        onCompleted: () =>
        {
            Console.WriteLine("End of search results");
        }));

}