우분투 트위터+맵리듀스

log4j.properties
# Root Logger
log4j.rootLogger=INFO, AppFile
# console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%t] %-5p %l – %m%n
#log4j.appender.console.encoding=EUC-KR
# AppFile
log4j.appender.AppFile=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.AppFile.File=/data1/services/nwofelis/htdocs/logs/app.log
log4j.appender.AppFile.File=app.log
log4j.appender.AppFile.Append=true
log4j.appender.AppFile.layout=org.apache.log4j.PatternLayout
log4j.appender.AppFile.layout.ConversionPattern=%n%d%n\%p : %l%n%m%n%n
log4j.appender.AppFile.DatePattern=’.’yyyy-MM-dd
#log4j.appender.AppFile.encoding=EUC-KR
twitter4j.properties
oauth.consumerSecret=mgfgertt(가짜)
oauth.accessToken=302ertertdJi4Z(가짜)
oauth.accessTokenSecret=2aihnAo1ertertre9trUDyx4wH(가짜)
oauth.consumerKey=tO0ertetQjt7(가짜)
TwitterListner.java
package com.sist.mapred3;
import org.apache.log4j.Logger;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
public class TwitterListner implements StatusListener{
private Logger logger=Logger.getLogger(TwitterListner.class);
@Override
public void onException(Exception ex) {
// TODO Auto-generated method stub
System.out.println(ex.getMessage());
}
@Override
public void onDeletionNotice(StatusDeletionNotice arg0) {
// TODO Auto-generated method stub
}
@Override
public void onScrubGeo(long arg0, long arg1) {
// TODO Auto-generated method stub
}
@Override
public void onStallWarning(StallWarning arg0) {
// TODO Auto-generated method stub
}
@Override
public void onStatus(Status status) {
// TODO Auto-generated method stub
System.out.println(“ID:”+status.getUser().getScreenName());
System.out.println(“Date:”+status.getCreatedAt());
System.out.println(“Message:”+status.getText());
logger.info(status.getText());
}
@Override
public void onTrackLimitationNotice(int arg0) {
// TODO Auto-generated method stub
}
}
TwitterMain.java
package com.sist.mapred3;
import twitter4j.FilterQuery;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
public class TwitterMain {
public static void main(String[] args) throws Exception{
TwitterStream ts=new TwitterStreamFactory().getInstance();
FilterQuery fq=new FilterQuery();
String[] data={“메르스”,”표절”,”야구”,”연평해전”,”6.25″};
fq.track(data);
TwitterListner listen=new TwitterListner();
ts.addListener(listen);
ts.filter(fq);
}
}
TwitterDriver.java
package com.sist.mapred4;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.*;
import java.nio.file.Files;
public class TwitterDriver {
public static void main(String[] args) throws Exception
{
try
{
//절대 경로명
File dir=new File(“/home/sist/bigdataStudy/MapReduceProject3/output”);
File[] files=dir.listFiles();
if(dir.exists()){
for(File f:files)
{
f.delete();
}
dir.delete();
}
}catch(Exception ex){}
//환경설정
Configuration conf=new Configuration();
//작업설정(이름부여)
Job job=new Job(conf,”Twitter”);
//실행파일 등록
job.setJarByClass(TwitterDriver.class);
//매퍼등록
job.setMapperClass(TwitterMapper.class);
//리듀스등록
job.setReducerClass(TwitterReducer.class);
//결과값 출력 등록
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//파일 읽기 => 파일 출력
FileInputFormat.addInputPath(job, new Path(“./app.log”));
FileOutputFormat.setOutputPath(job, new Path(“./output”));
//실행요청
job.waitForCompletion(true);
System.out.println(“데이터처리 완료”);
}
}
TwitterMapper.java
package com.sist.mapred4;
import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.util.regex.*;
public class TwitterMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one=new IntWritable(1);
private final String regex1=”메르스”;
private final String regex2=”표절”;
private final String regex3=”야구”;
private final String regex4=”연평해전”;
private final String regex5=”625″;
private Text outkey=new Text();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
Pattern p1=Pattern.compile(regex1);
Pattern p2=Pattern.compile(regex2);
Pattern p3=Pattern.compile(regex3);
Pattern p4=Pattern.compile(regex4);
Pattern p5=Pattern.compile(regex5);
Matcher m1=p1.matcher(value.toString());
Matcher m2=p2.matcher(value.toString());
Matcher m3=p3.matcher(value.toString());
Matcher m4=p4.matcher(value.toString());
Matcher m5=p5.matcher(value.toString());
try
{
while(m1.find())
{
outkey.set(regex1);
context.write(outkey, one);
}
while(m2.find())
{
outkey.set(regex2);
context.write(outkey, one);
}
while(m3.find())
{
outkey.set(regex3);
context.write(outkey, one);
}
while(m4.find())
{
outkey.set(regex4);
context.write(outkey, one);
}
while(m5.find())
{
outkey.set(regex5);
context.write(outkey, one);
}
}catch(Exception ex){}
}
}
TwitterReducer.java
package com.sist.mapred4;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.sist.mongo.*;
//통합된 상태
public class TwitterReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable result=new IntWritable();
//private Text key=new Text();
private TwitterDAO dao=new TwitterDAO();
@Override
protected void reduce(Text key, Iterable<IntWritable> value,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum=0;
for(IntWritable i:value)
{
sum+=i.get();
}
result.set(sum);
context.write(key, result);
dao.insert(key.toString(), sum);
}
}
TwitterDAO.java
package com.sist.mongo;
import com.mongodb.*;
public class TwitterDAO {
private MongoClient mc;
private DB db;
private DBCollection dbc;
public TwitterDAO(){
try{
mc=new MongoClient(“localhost”);
db=mc.getDB(“mydb”);
dbc=db.getCollection(“tm”);
}catch(Exception ex){System.out.println(ex.getMessage());}
}
public void insert(String key,int value){
try{
BasicDBObject obj=new BasicDBObject();
obj.put(“key”,key);
obj.put(“count”,value);
dbc.insert(obj);
}catch(Exception ex){System.out.println(ex.getMessage());}
}
}
1. 몽고 디비 서버돌리기
cd /usr/local/mongodb/bin
sudo ./mongod –dbpath /usr/local/mongodb/data
2. R 에서 몽고 디비 연동
콘솔에서 sudo R을 써서 R로 접속
library(“KoNLP”)
library(“rmongodb”)
mongo<-mongo.create(host=”localhost”)
mongo.is.connected(mongo)
find_all<-mongo.find.all(mongo,ns=”mydb.tm”)
data<-as.data.frame(find_all)
a<-c(data$count,data$count.2,data$count.3,data$count.4)
a
barplot(a)
names(data)