Kafka入门 专题
您的位置:Kafka > Kafka入门 专题 > Apache Kafka 实时应用程序(Twitter)
Apache Kafka 实时应用程序(Twitter)
作者:--    发布时间:2019-11-20

让我们分析一个实时应用程序,以获取最新的twitter feed和其标签。 早些时候,我们已经看到了storm和spark与kafka的集成。 在这两种情况下,我们创建了一个kafka生产者(使用cli)向kafka生态系统发送消息。 然后,stormspark集成通过使用kafka消费者读取消息,并将其分别注入到storm和spark生态系统中。 因此,实际上我们需要创建一个kafka producer,

  • 使用“twitter streaming api"阅读twitter feed,
  • 处理feeds,
  • 提取hashtags
  • 发送到kafka。

一旦kafka接收到 hashtags ,storm / spark集成接收到该信息并将其发送到storm / spark生态系统。

twitter streaming api

“twitter streaming api"可以使用任何编程语言访问。 “twitter4j"是一个开源的非官方java库,它提供了一个基于java的模块,可以轻松访问“twitter streaming api"。 “twitter4j"提供了一个基于监听器的框架来访问tweet。 要访问“twitter streaming api",我们需要登录twitter开发者帐户,并应获取以下 oauth 身份验证详细信息。

  • customerkey
  • customersecret
  • accesstoken
  • accesstookensecret

创建开发人员帐户后,下载“twitter4j"jar文件并将其放置在java类路径中。

完整的twitter kafka生产者编码(kafkatwitterproducer.java)如下所列 -

import java.util.arrays;
import java.util.properties;
import java.util.concurrent.linkedblockingqueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.producer;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerrecord;

public class kafkatwitterproducer {
   public static void main(string[] args) throws exception {
      linkedblockingqueue<status> queue = new linkedblockingqueue<sta-tus>(1000);
      
      if(args.length < 5){
         system.out.println(
            "usage: kafkatwitterproducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }
      
      string consumerkey = args[0].tostring();
      string consumersecret = args[1].tostring();
      string accesstoken = args[2].tostring();
      string accesstokensecret = args[3].tostring();
      string topicname = args[4].tostring();
      string[] arguments = args.clone();
      string[] keywords = arrays.copyofrange(arguments, 5, arguments.length);

      configurationbuilder cb = new configurationbuilder();
      cb.setdebugenabled(true)
         .setoauthconsumerkey(consumerkey)
         .setoauthconsumersecret(consumersecret)
         .setoauthaccesstoken(accesstoken)
         .setoauthaccesstokensecret(accesstokensecret);

      twitterstream twitterstream = new twitterstreamfactory(cb.build()).get-instance();
      statuslistener listener = new statuslistener() {
        
         @override
         public void onstatus(status status) {      
            queue.offer(status);

            // system.out.println("@" &plus; status.getuser().getscreenname() 
               &plus; " - " &plus; status.gettext());
            // system.out.println("@" &plus; status.getuser().getscreen-name());

            /*for(urlentity urle : status.geturlentities()) {
               system.out.println(urle.getdisplayurl());
            }*/

            /*for(hashtagentity hashtage : status.gethashtagentities()) {
               system.out.println(hashtage.gettext());
            }*/
         }
         
         @override
         public void ondeletionnotice(statusdeletionnotice statusdeletion-notice) {
            // system.out.println("got a status deletion notice id:" 
               &plus; statusdeletionnotice.getstatusid());
         }
         
         @override
         public void ontracklimitationnotice(int numberoflimitedstatuses) {
            // system.out.println("got track limitation notice:" &plus; 
               num-beroflimitedstatuses);
         }

         @override
         public void onscrubgeo(long userid, long uptostatusid) {
            // system.out.println("got scrub_geo event userid:" &plus; userid &plus; 
            "uptostatusid:" &plus; uptostatusid);
         }      
         
         @override
         public void onstallwarning(stallwarning warning) {
            // system.out.println("got stall warning:" &plus; warning);
         }
         
         @override
         public void onexception(exception ex) {
            ex.printstacktrace();
         }
      };
      twitterstream.addlistener(listener);
      
      filterquery query = new filterquery().track(keywords);
      twitterstream.filter(query);

      thread.sleep(5000);
      
      //add kafka producer config settings
      properties props = new properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.stringserializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.stringserializer");
      
      producer<string, string> producer = new kafkaproducer<string, string>(props);
      int i = 0;
      int j = 0;
      
      while(i < 10) {
         status ret = queue.poll();
         
         if (ret == null) {
            thread.sleep(100);
            i++;
         }else {
            for(hashtagentity hashtage : ret.gethashtagentities()) {
               system.out.println("hashtag: " &plus; hashtage.gettext());
               producer.send(new producerrecord<string, string>(
                  top-icname, integer.tostring(j++), hashtage.gettext()));
            }
         }
      }
      producer.close();
      thread.sleep(5000);
      twitterstream.shutdown();
   }
}

汇编

使用以下命令编译应用程序 -

javac -cp "/path/to/kafka/libs/*":"/path/to/twitter4j/lib/*":. kafkatwitterproducer.java

执行

打开两个控制台。 在一个控制台中运行上面编译的应用程序,如下所示。

java -cp “/path/to/kafka/libs/*":"/path/to/twitter4j/lib/*":
. kafkatwitterproducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

在另一个窗口中运行前一章中解释的spark / storm应用程序中的任何一个。 主要要注意的是,在这两种情况下使用的主题应该是相同的。 在这里,我们使用“我的第一主题"作为主题名称。

输出

此应用程序的输出将取决于关键字和twitter的当前feed。 下面指定样本输出(集成storm)。

. . .
food : 1
foodie : 2
burger : 1
. . .

网站声明:
本站部分内容来自网络,如您发现本站内容
侵害到您的利益,请联系本站管理员处理。
联系站长
373515719@qq.com
关于本站:
编程参考手册