让我们分析一个实时应用程序,以获取最新的twitter feed和其标签。 早些时候,我们已经看到了storm和spark与kafka的集成。 在这两种情况下,我们创建了一个kafka生产者(使用cli)向kafka生态系统发送消息。 然后,storm和spark集成通过使用kafka消费者读取消息,并将其分别注入到storm和spark生态系统中。 因此,实际上我们需要创建一个kafka producer,
一旦kafka接收到 hashtags
,storm / spark集成接收到该信息并将其发送到storm / spark生态系统。
“twitter streaming api"可以使用任何编程语言访问。 “twitter4j"是一个开源的非官方java库,它提供了一个基于java的模块,可以轻松访问“twitter streaming api"。 “twitter4j"提供了一个基于监听器的框架来访问tweet。 要访问“twitter streaming api",我们需要登录twitter开发者帐户,并应获取以下 oauth 身份验证详细信息。
创建开发人员帐户后,下载“twitter4j"jar文件并将其放置在java类路径中。
完整的twitter kafka生产者编码(kafkatwitterproducer.java)如下所列 -
import java.util.arrays; import java.util.properties; import java.util.concurrent.linkedblockingqueue; import twitter4j.*; import twitter4j.conf.*; import org.apache.kafka.clients.producer.producer; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producerrecord; public class kafkatwitterproducer { public static void main(string[] args) throws exception { linkedblockingqueue<status> queue = new linkedblockingqueue<sta-tus>(1000); if(args.length < 5){ system.out.println( "usage: kafkatwitterproducer <twitter-consumer-key> <twitter-consumer-secret> <twitter-access-token> <twitter-access-token-secret> <topic-name> <twitter-search-keywords>"); return; } string consumerkey = args[0].tostring(); string consumersecret = args[1].tostring(); string accesstoken = args[2].tostring(); string accesstokensecret = args[3].tostring(); string topicname = args[4].tostring(); string[] arguments = args.clone(); string[] keywords = arrays.copyofrange(arguments, 5, arguments.length); configurationbuilder cb = new configurationbuilder(); cb.setdebugenabled(true) .setoauthconsumerkey(consumerkey) .setoauthconsumersecret(consumersecret) .setoauthaccesstoken(accesstoken) .setoauthaccesstokensecret(accesstokensecret); twitterstream twitterstream = new twitterstreamfactory(cb.build()).get-instance(); statuslistener listener = new statuslistener() { @override public void onstatus(status status) { queue.offer(status); // system.out.println("@" + status.getuser().getscreenname() + " - " + status.gettext()); // system.out.println("@" + status.getuser().getscreen-name()); /*for(urlentity urle : status.geturlentities()) { system.out.println(urle.getdisplayurl()); }*/ /*for(hashtagentity hashtage : status.gethashtagentities()) { system.out.println(hashtage.gettext()); }*/ } @override public void ondeletionnotice(statusdeletionnotice statusdeletion-notice) { // system.out.println("got a status deletion notice id:" + statusdeletionnotice.getstatusid()); } @override public void ontracklimitationnotice(int numberoflimitedstatuses) { // system.out.println("got track limitation notice:" + num-beroflimitedstatuses); } @override public void onscrubgeo(long userid, long uptostatusid) { // system.out.println("got scrub_geo event userid:" + userid + "uptostatusid:" + uptostatusid); } @override public void onstallwarning(stallwarning warning) { // system.out.println("got stall warning:" + warning); } @override public void onexception(exception ex) { ex.printstacktrace(); } }; twitterstream.addlistener(listener); filterquery query = new filterquery().track(keywords); twitterstream.filter(query); thread.sleep(5000); //add kafka producer config settings properties props = new properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serializa-tion.stringserializer"); props.put("value.serializer", "org.apache.kafka.common.serializa-tion.stringserializer"); producer<string, string> producer = new kafkaproducer<string, string>(props); int i = 0; int j = 0; while(i < 10) { status ret = queue.poll(); if (ret == null) { thread.sleep(100); i++; }else { for(hashtagentity hashtage : ret.gethashtagentities()) { system.out.println("hashtag: " + hashtage.gettext()); producer.send(new producerrecord<string, string>( top-icname, integer.tostring(j++), hashtage.gettext())); } } } producer.close(); thread.sleep(5000); twitterstream.shutdown(); } }
使用以下命令编译应用程序 -
javac -cp "/path/to/kafka/libs/*":"/path/to/twitter4j/lib/*":. kafkatwitterproducer.java
打开两个控制台。 在一个控制台中运行上面编译的应用程序,如下所示。
java -cp “/path/to/kafka/libs/*":"/path/to/twitter4j/lib/*": . kafkatwitterproducer <twitter-consumer-key> <twitter-consumer-secret> <twitter-access-token> <twitter-ac-cess-token-secret> my-first-topic food
在另一个窗口中运行前一章中解释的spark / storm应用程序中的任何一个。 主要要注意的是,在这两种情况下使用的主题应该是相同的。 在这里,我们使用“我的第一主题"作为主题名称。
此应用程序的输出将取决于关键字和twitter的当前feed。 下面指定样本输出(集成storm)。
. . . food : 1 foodie : 2 burger : 1 . . .