REST API Example Application: splunkd.com

Developers: Carl Yestrau and David Carasso

What does my application do?

We built a website—http://splunkd.com/, which allows users to search twitter using the Splunk REST API. The application uses the Tornado web server, Twitter's REST API, and Splunk's REST API.

How do we get the tweets?

We have one Python script running in the background that hits Twitter's REST endpoints to retrieve tweets. It then appends the tweets it retrieves to a log file.

Given that twitter rate limits the number of tweets you can pull back, we decided to use a method to get the "most important" tweets using their REST's Search API. So our goal is to get keywords that people think are important. First, we get some seed terms from Yahoo Buzz:


    def getSeeds():
        url = 'http://buzzlog.buzz.yahoo.com/feeds/buzzoverl.xml'
        page = urllib2.urlopen(url)
        xml = page.read()
        seeds = set(re.findall("<title>\d+\. (.*)</title>", xml))
        return seeds

Then we take those seed terms and search twitter for them, retrieving all those tweets:

    def getSearch(query):
        url = 'http://search.twitter.com/search.json?&q=%s' % query
        page = urllib2.urlopen(url)
        json = page.read()
        data = simplejson.loads(json)
        out("s")
        return data['results']

After we've run all our searches, we scan the tweets for any hashtags, and add the most popular to the list of search terms. And the whole thing repeats, fanning out, getting the terms that most people are interested in:

    ...
    newsearches = set()
    for t in tweets:
        text   = t.get('text','')
        matches = re.findall("#([a-zA-Z0-9_-]+)", text)
        for match in matches:
            if match not in oldsearches:
                newsearches.add(match)
    ...
OK, at this point we have a bunch of tweets to output to a log file, which Splunk will then monitor and index. It's important to log *well*, as described here. We want to use lots of attribute=value formulations and timestamps that are clear.

    def toEvent(tweet):
        time   = tweet.get('created_at','')
        text   = cleanTweetText(tweet.get('text',''))
        sender = getAttrVal(tweet, 'from_user','from')
        to     = getAttrVal(tweet, 'to_user',  'to')
        geo = ""
        if tweet['geo'] != None and 'coordinates' in tweet['geo']:
            latitude  = tweet['geo']['coordinates'][0]
            longitude = tweet['geo']['coordinates'][1]
            geo = 'latitude="%s" longitude="%s"' % (latitude, longitude)
        out = "%s [%s %s %s] %s" % (time, sender, to, geo, text)
        return re.sub("\s+", " ", out).replace(" ]", "]") # clean up whitespace
These files get appended to tweets.log, and the output looks like this:
    Wed, 22 Jun 2011 08:39:43 +0000 [from="SpotMonkey" to="nectar" latitude="51.503" longitude="-0.1025"] @nectar the Android app is also rather prone to crashing.
    ...
    Sun, 24 Jul 2011 20:36:18 +0000 [from="GamerPerfection"] Re: Green Lantern damn you....
    ...
    Sun, 24 Jul 2011 20:35:04 +0000 [from="HautTotes" to="akacinders"] @akacinders Unfortunately very few places power wheelchair friendly :(
In our inputs.conf file used by Splunk to specify which files to monitor, we have this entry:
    [monitor:///blaze/twitter/tweets.log]
    index = twitter
At this point, we have a Python script, merrily appending to tweets.log, a file that Splunk is indexing. We can now search our twitter data.

How do we search the tweets?

When a user comes into our web server, our code is called to generate results. The first thing we must do is have our web application login to Splunk. Here we're hitting the /services/auth/login endpoint to retrieve the sessionKey:

    
    ... 
    post_args = {
      "username": self.settings["splunk_username"],
      "password": self.settings["splunk_password"],
    }
    response, xml, json, text = self.sync_request("/services/auth/login", post_args=post_args)
    if response.error is None and xml is not None:
        logging.info("Successfully retrieved Splunk session_key")
        return xml.findtext("sessionKey")
    else:
        logging.info("Could not retrieve Splunk session_key")
        return None
    ...
Now that we're logged into Splunk, we have two different ways we run searches, synchronously and asynchronously.

Synchronously

    ...
    self.set_header("Expires", datetime.datetime.utcnow() + datetime.timedelta(days=365))
    self.async_request("/services/search/jobs/oneshot", self._on_create, session_key=self.session_key, max_count=100, search=users_search, spawn_process="0", segmentation="inner")

We make a call to /services/search/jobs/oneshot, which is the same as /services/search/jobs, with exec_mode="oneshot". It basically runs the search and brings back the results immediately. We sent max_count=100 to only retrieve 100 results. We set spawn_process=0—for safe, simple searches this can be faster than having Splunk spawn a separate thread. Finally, we set segmentation—how the events are rendered—as "inner" which breaks each token up as a separate XML token: <sg>token</sg>.

Asynchronously

We won't go into this in detail, but basically to asynchronously search we kick off a search job with the user_search.

    post_args = dict(
        max_count = 100,
        max_time = 2.0,
        required_field_list = ",".join(options.splunk_search_async_required_field_list),
        search = users_search,
        segmentation = "inner",
        spawn_process = "1"
    )
    self.async_request("/services/search/jobs", self._on_async_search_create, session_key=self.session_key, post_args=post_args)
    ...
    sid = xml.findtext("sid")

We then use the search job id, sid, to call the /services/search/jobs/<sid>/events endpoint and retrieve the events.

After we get the results, we cancel the search job:

    ...
    self.async_request("/services/search/jobs/%s/control" % sid, self._on_async_search_cancel, session_key=self.session_key, post_args=dict(action="cancel"))
    ...

Any problems encountered:

We made splunkd.com do a search on every keystroke. Unfortunately, splunkd.com is plagued by an underpowered Amazon EC2 instance, so the results don't come back fast enough, and we need to add a better caching scheme.