让我们分析一个实时应用程序,以获取最新的twitter feed和其标签。 早些时候,我们已经看到了storm和spark与kafka的集成。 在这两种情况下,我们创建了一个kafka生产者(使用cli)向kafka生态系统发送消息。 然后,storm和spark集成通过使用kafka消费者读取消息,并将其分别注入到storm和spark生态系统中。 因此,实际上我们需要创建一个kafka producer,
一旦kafka接收到 hashtags
,storm / spark集成接收到该信息并将其发送到storm / spark生态系统。
“twitter streaming api"可以使用任何编程语言访问。 “twitter4j"是一个开源的非官方java库,它提供了一个基于java的模块,可以轻松访问“twitter streaming api"。 “twitter4j"提供了一个基于监听器的框架来访问tweet。 要访问“twitter streaming api",我们需要登录twitter开发者帐户,并应获取以下 oauth 身份验证详细信息。
创建开发人员帐户后,下载“twitter4j"jar文件并将其放置在java类路径中。
完整的twitter kafka生产者编码(kafkatwitterproducer.java)如下所列 -
import java.util.arrays;
import java.util.properties;
import java.util.concurrent.linkedblockingqueue;
import twitter4j.*;
import twitter4j.conf.*;
import org.apache.kafka.clients.producer.producer;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerrecord;
public class kafkatwitterproducer {
public static void main(string[] args) throws exception {
linkedblockingqueue<status> queue = new linkedblockingqueue<sta-tus>(1000);
if(args.length < 5){
system.out.println(
"usage: kafkatwitterproducer <twitter-consumer-key>
<twitter-consumer-secret> <twitter-access-token>
<twitter-access-token-secret>
<topic-name> <twitter-search-keywords>");
return;
}
string consumerkey = args[0].tostring();
string consumersecret = args[1].tostring();
string accesstoken = args[2].tostring();
string accesstokensecret = args[3].tostring();
string topicname = args[4].tostring();
string[] arguments = args.clone();
string[] keywords = arrays.copyofrange(arguments, 5, arguments.length);
configurationbuilder cb = new configurationbuilder();
cb.setdebugenabled(true)
.setoauthconsumerkey(consumerkey)
.setoauthconsumersecret(consumersecret)
.setoauthaccesstoken(accesstoken)
.setoauthaccesstokensecret(accesstokensecret);
twitterstream twitterstream = new twitterstreamfactory(cb.build()).get-instance();
statuslistener listener = new statuslistener() {
@override
public void onstatus(status status) {
queue.offer(status);
// system.out.println("@" + status.getuser().getscreenname()
+ " - " + status.gettext());
// system.out.println("@" + status.getuser().getscreen-name());
/*for(urlentity urle : status.geturlentities()) {
system.out.println(urle.getdisplayurl());
}*/
/*for(hashtagentity hashtage : status.gethashtagentities()) {
system.out.println(hashtage.gettext());
}*/
}
@override
public void ondeletionnotice(statusdeletionnotice statusdeletion-notice) {
// system.out.println("got a status deletion notice id:"
+ statusdeletionnotice.getstatusid());
}
@override
public void ontracklimitationnotice(int numberoflimitedstatuses) {
// system.out.println("got track limitation notice:" +
num-beroflimitedstatuses);
}
@override
public void onscrubgeo(long userid, long uptostatusid) {
// system.out.println("got scrub_geo event userid:" + userid +
"uptostatusid:" + uptostatusid);
}
@override
public void onstallwarning(stallwarning warning) {
// system.out.println("got stall warning:" + warning);
}
@override
public void onexception(exception ex) {
ex.printstacktrace();
}
};
twitterstream.addlistener(listener);
filterquery query = new filterquery().track(keywords);
twitterstream.filter(query);
thread.sleep(5000);
//add kafka producer config settings
properties props = new properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.stringserializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.stringserializer");
producer<string, string> producer = new kafkaproducer<string, string>(props);
int i = 0;
int j = 0;
while(i < 10) {
status ret = queue.poll();
if (ret == null) {
thread.sleep(100);
i++;
}else {
for(hashtagentity hashtage : ret.gethashtagentities()) {
system.out.println("hashtag: " + hashtage.gettext());
producer.send(new producerrecord<string, string>(
top-icname, integer.tostring(j++), hashtage.gettext()));
}
}
}
producer.close();
thread.sleep(5000);
twitterstream.shutdown();
}
}
使用以下命令编译应用程序 -
javac -cp "/path/to/kafka/libs/*":"/path/to/twitter4j/lib/*":. kafkatwitterproducer.java打开两个控制台。 在一个控制台中运行上面编译的应用程序,如下所示。
java -cp “/path/to/kafka/libs/*":"/path/to/twitter4j/lib/*": . kafkatwitterproducer <twitter-consumer-key> <twitter-consumer-secret> <twitter-access-token> <twitter-ac-cess-token-secret> my-first-topic food
在另一个窗口中运行前一章中解释的spark / storm应用程序中的任何一个。 主要要注意的是,在这两种情况下使用的主题应该是相同的。 在这里,我们使用“我的第一主题"作为主题名称。
此应用程序的输出将取决于关键字和twitter的当前feed。 下面指定样本输出(集成storm)。
. . . food : 1 foodie : 2 burger : 1 . . .