Stream Computing and MQTT: A Simple Sample

In this topic I will be receiving data with IBM InfoSphere Streams using a highly volatile protocol from a sending source. Received data will be stored into a local file. The concept I am using is called pub/sub or the publish subsrcibe pattern. We need a few components to make this possible:

  • IBM InfoSphere Streams (stream computing): you can get your copy here (for learning and fun, no production allowed)
  • A messaging protocol (used by the client software to either publish or subscribe)
  • A broker (for managing the pub/sub data flows). For more information goto this topic

What is stream computing?

Stream computing delivers real-time analytic processing on constantly changing data in motion. It enables descriptive and predictive analytics to support real time decisions. Stream computing allows you to capture and analyze all data – all the time, just in time.

Data is all around us – from social media feeds to call data records to videos – but can be difficult to leverage. Sometimes there is simply too much data to collect and store before analyzing it. Sometimes it’s an issue of timing – by the time you store data, analyze it, and respond – it’s too late.

Stream computing changes where, when and how much data you can analyze. Store less, analyze more, and make better decisions, faster with stream computing. For more information: IBM Stream Computing.

The protocol

MQTT is a machine-to-machine (M2M)/”Internet of Things” connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. For more information: http://mqtt.org/.

What is happening?

You first have to install IBM InfoSphere Streams (mine is running on CentOS 65). It is pretty straight forward: unpack the installation archive to a directory. Find the “dependency_checker.sh” script (in the StreamsInstallFiles directory) and run it. It will tell you what libraries are needed: so do a few yum installs and your done.

After installation, please adjust the .bashrc file, this is mine:

# .bashrc
# Source global definitions
if [ -f /etc/bashrc ]; then
. /etc/bashrc
fi

# User specific aliases and functions
PATH=$PATH:/opt/ibm/java-x86_64-70/jre/bin
export PATH
export JAVA_HOME=/opt/ibm/java-x86_64-70/jre/bin
export STREAMS_MESSAGING_MQTT_HOME=/opt/ibm/MQTT/SDK
source /opt/ibm/InfoSphereStreams/bin/streamsprofile.sh

I do not think you need a seperate Java install. It is included with Streams but for MQTT to make the MQTT sample work you need to add the MQTT client 3.15 to your environment. Please download it via Fix Central. Also have the STREAMS_MESSAGING_MQTT_HOME system variable pointing to the installation directory of the MQTT client. Finally run the streamsprofile.sh each time when you log in (so: the the last line in your .bashrc). Do this for each user that might be doing something with Streams.

Please follow the First Steps guide to finish up the installation and get yourself started with Streams.

Streams Main

After walking through the First Steps guide, you will end up with a project and a SPL application. To make this working I added com.ibm.streams.messaging to my project as a dependency. We will need the MQTT stuff to make this working. In the SPL application I added three operators:

  • MQTTSource
  • Parse
  • FileSink

For the streams between the operators I used single variables:

  • MQTTSource -> Parse: blob
  • Pars -> FileSink: rstring

For each of the operators, I used the following parameters:

MQTTSource

MQTTSource

Parse

Parse

FileSink

FileSink

The overall code will look like this:

namespace my.mqtt.streaming ;

use com.ibm.streams.messaging.mqtt::MQTTSource ;

composite SimpleStream
{
  graph
    (stream mqtt_topic) as SimpleMQTTSource = MQTTSource()
    {
      param
        topics : [ "streams" ] ;
        serverURI : "localhost:1883" ;
        format : block ;
    }

    () as SimpleFileSink = FileSink(SimpleParser as inPort0Alias)
    {
      param
        file : "mqtt.topics" ;
        format : csv ;
        quoteStrings : false ;
    }

    (stream SimpleParser) as SimpleParse = Parse(mqtt_topic)
    {
      param
        format : csv ;
    }
}

When your SPL application is fine (there are no errors), you can launch it. It will be deployed to your Streams instance (the one you have created with the First Steps guide):

Streams deployed

When feeding MQTT messages, you will see the deployed SPL application is working:

Streams working

Finally: let´s take a look at the local file:

Local file

This is a very simple sample: we are receiving three types of MQTT topics with values “RED”, “GREEN” or “BLACK”.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s