Skip to content
This repository was archived by the owner on Jan 27, 2020. It is now read-only.

Twitter Content Filter Tutorial

Christian Kreutzfeldt edited this page Apr 22, 2015 · 2 revisions

This tutorial shows how to set up a pipeline which reads content from a source, filters it for specific values and copies the results into a destination (here: kafka topic).

To learn more about the configuration format, please read the associated documentation on that topic.

We assume that you have a standalone processing node up and running. If not, see the build and deployment instruction for guidance.

General settings

The pipeline set up in this use case is named twitter-filter-to-kafka_.

Queues

As this pipelines reads content from a source, forwards it to a filter and finally writes it to a sink, we need two queues for exchanging data between pipeline components:

  • twitter-content (raw data received from twitter streaming api)
  • filtered-content (filtered content)

Components

The pipeline configures three components:

The source component establishes a connection with the Twitter streaming api using the spqr-twitter source.

All incoming data is handed over to the direct response operator which filters the contents for a specific value in data.cs-host using the filter operator found in spqr-json.

The matching events are finally written to Kafka using the emitter found in spqr-kafka.

Deployment

To put the pipeline to life, we assume that you have a Kafka instance available somewhere and that a processing node is running in standalone mode (to keep things simple).

As the processing node comes with a set of pre-installed components, all you have to do, is POST the configuration shown below to processing node running at port 7070 (localhost): (configuration is assumed to live inside file twitter-filter-to-kafka.json)

curl -H "Content-Type: application/json" -X POST -d @twitter-filter-to-kafka.json http://localhost:7070/pipelines/

The response should look like the following and the Kafka topic should receive incoming data:

{
  state: "OK"
  msg: ""
  pid: "twitter-filter-to-kafka"
}

Configuration

{
  "id" : "twitter-filter-to-kafka",
  "queues" : [ 
    { "id" : "twitter-content", "queueSettings" : null },
    { "id" : "filtered-content", "queueSettings" : null } 
  ],
  "components" : [ {
    "id" : "twitter-stream-reader",
    "type" : "SOURCE",
    "name" : "twitterSource",
    "version" : "0.0.1",
    "settings" : {
      "twitter.consumer.key" : "<your_consumer_key>",
      "twitter.consumer.secret" : "<your_consumer_secret>",
      "twitter.token.key" : "<your_token_key>",
      "twitter.token.secret" : "<your_token_secret>",
      "twitter.tweet.terms" : "fifa,uefa,soccer"
    },
    "fromQueue" : "",
    "toQueue" : "twitter-content"
  },  
  {
    "id" : "twitter-content-filter",
    "type" : "DIRECT_RESPONSE_OPERATOR",
    "name" : "jsonContentFilter",
    "version" : "0.0.1",
    "settings" : {
      "field.1.path": "text",
      "field.1.expression": "(?i).*soccer.*",
      "field.1.type": "STRING"
    },
    "fromQueue": "twitter-content",
    "toQueue": "filtered-content"
  }, 
  {
    "id" : "kafka-topic-emitter",
    "type" : "EMITTER",
    "name" : "kafkaEmitter",
    "version" : "0.0.1",
    "settings" : {
      "clientId" : "twitterToKafka",
      "topic" : "twitter",
      "metadataBrokerList" : "localhost:9092",
      "zookeeperConnect" : "localhost:2181",
      "messageAcking" : "false",
      "charset" : "UTF-8"
    },
    "fromQueue" : "filtered-content",
    "toQueue" : ""
  } ]
}
Clone this wiki locally