우분투 트위터+맵리듀스

우분투 트위터+맵리듀스

log4j.properties

 # Root Logger
log4j.rootLogger=INFO, AppFile

# console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%t] %-5p %l – %m%n
#log4j.appender.console.encoding=EUC-KR

# AppFile
log4j.appender.AppFile=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.AppFile.File=/data1/services/nwofelis/htdocs/logs/app.log
log4j.appender.AppFile.File=app.log
log4j.appender.AppFile.Append=true
log4j.appender.AppFile.layout=org.apache.log4j.PatternLayout
log4j.appender.AppFile.layout.ConversionPattern=%n%d%n\%p : %l%n%m%n%n
log4j.appender.AppFile.DatePattern=’.’yyyy-MM-dd
#log4j.appender.AppFile.encoding=EUC-KR

twitter4j.properties

oauth.consumerSecret=mgfgertt(가짜)
oauth.accessToken=302ertertdJi4Z(가짜)
oauth.accessTokenSecret=2aihnAo1ertertre9trUDyx4wH(가짜)
oauth.consumerKey=tO0ertetQjt7(가짜)

TwitterListner.java

package com.sist.mapred3;

import org.apache.log4j.Logger;

import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

public class TwitterListner implements StatusListener{
    private Logger logger=Logger.getLogger(TwitterListner.class);
   
    @Override
    public void onException(Exception ex) {
        // TODO Auto-generated method stub
        System.out.println(ex.getMessage());
    }

    @Override
    public void onDeletionNotice(StatusDeletionNotice arg0) {
        // TODO Auto-generated method stub
       
    }

    @Override
    public void onScrubGeo(long arg0, long arg1) {
        // TODO Auto-generated method stub
       
    }

    @Override
    public void onStallWarning(StallWarning arg0) {
        // TODO Auto-generated method stub
       
    }

    @Override
    public void onStatus(Status status) {
        // TODO Auto-generated method stub
        System.out.println(“ID:”+status.getUser().getScreenName());
        System.out.println(“Date:”+status.getCreatedAt());
        System.out.println(“Message:”+status.getText());
        logger.info(status.getText());
       
    }

    @Override
    public void onTrackLimitationNotice(int arg0) {
        // TODO Auto-generated method stub
       
    }
   
}

TwitterMain.java

package com.sist.mapred3;

import twitter4j.FilterQuery;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

public class TwitterMain {

    public static void main(String[] args) throws Exception{
        TwitterStream ts=new TwitterStreamFactory().getInstance();
        FilterQuery fq=new FilterQuery();
        String[] data={“메르스”,”표절”,”야구”,”연평해전”,”6.25″};
        fq.track(data);
        TwitterListner listen=new TwitterListner();
        ts.addListener(listen);
        ts.filter(fq);
    }

}

TwitterDriver.java

package com.sist.mapred4;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.*;
import java.nio.file.Files;

public class TwitterDriver {
    public static void main(String[] args) throws Exception
    {
        try
        {
            //절대 경로명
            File dir=new File(“/home/sist/bigdataStudy/MapReduceProject3/output”);
            File[] files=dir.listFiles();
            if(dir.exists()){
            for(File f:files)
            {
                f.delete();
            }
            dir.delete();
            }
        }catch(Exception ex){}
        //환경설정
        Configuration conf=new Configuration();
        //작업설정(이름부여)
        Job job=new Job(conf,”Twitter”);
        //실행파일 등록
        job.setJarByClass(TwitterDriver.class);
        //매퍼등록
        job.setMapperClass(TwitterMapper.class);
        //리듀스등록
        job.setReducerClass(TwitterReducer.class);
        //결과값 출력 등록
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //파일 읽기 => 파일 출력
        FileInputFormat.addInputPath(job, new Path(“./app.log”));
        FileOutputFormat.setOutputPath(job, new Path(“./output”));
        //실행요청
        job.waitForCompletion(true);
        System.out.println(“데이터처리 완료”);
       
    }
}

TwitterMapper.java

package com.sist.mapred4;

import java.io.IOException;
import java.util.regex.Pattern;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.util.regex.*;
public class TwitterMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private final static IntWritable one=new IntWritable(1);
   
    private final String regex1=”메르스”;
    private final String regex2=”표절”;
    private final String regex3=”야구”;
    private final String regex4=”연평해전”;
    private final String regex5=”625″;
   
    private Text outkey=new Text();
    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        Pattern p1=Pattern.compile(regex1);
        Pattern p2=Pattern.compile(regex2);
        Pattern p3=Pattern.compile(regex3);
        Pattern p4=Pattern.compile(regex4);
        Pattern p5=Pattern.compile(regex5);
   
        Matcher m1=p1.matcher(value.toString());
        Matcher m2=p2.matcher(value.toString());
        Matcher m3=p3.matcher(value.toString());
        Matcher m4=p4.matcher(value.toString());
        Matcher m5=p5.matcher(value.toString());
        try
        {
            while(m1.find())
            {
                outkey.set(regex1);
                context.write(outkey, one);
            }
            while(m2.find())
            {
                outkey.set(regex2);
                context.write(outkey, one);
            }
            while(m3.find())
            {
                outkey.set(regex3);
                context.write(outkey, one);
            }
            while(m4.find())
            {
                outkey.set(regex4);
                context.write(outkey, one);
            }
            while(m5.find())
            {
                outkey.set(regex5);
                context.write(outkey, one);
            }
        }catch(Exception ex){}
    }
   
}

TwitterReducer.java

package com.sist.mapred4;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.sist.mongo.*;
//통합된 상태
public class TwitterReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    private IntWritable result=new IntWritable();
    //private Text key=new Text();
    private TwitterDAO dao=new TwitterDAO();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> value,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        int sum=0;
        for(IntWritable i:value)
        {
            sum+=i.get();
        }
        result.set(sum);
        context.write(key, result);
        dao.insert(key.toString(), sum);
    }
   
}

TwitterDAO.java

package com.sist.mongo;

import com.mongodb.*;

public class TwitterDAO {
    private MongoClient mc;
    private DB db;
    private DBCollection dbc;
    public TwitterDAO(){
        try{
            mc=new MongoClient(“localhost”);
            db=mc.getDB(“mydb”);
            dbc=db.getCollection(“tm”);
           
        }catch(Exception ex){System.out.println(ex.getMessage());}
    }
    public void insert(String key,int value){
        try{
            BasicDBObject obj=new BasicDBObject();
            obj.put(“key”,key);
            obj.put(“count”,value);
            dbc.insert(obj);
           
        }catch(Exception ex){System.out.println(ex.getMessage());}
    }
}
 

1. 몽고 디비 서버돌리기 

cd /usr/local/mongodb/bin

sudo ./mongod –dbpath /usr/local/mongodb/data

2. R 에서 몽고 디비 연동 

콘솔에서 sudo R을 써서 R로 접속

library(“KoNLP”)
library(“rmongodb”)
mongo<-mongo.create(host=”localhost”)
mongo.is.connected(mongo)

find_all<-mongo.find.all(mongo,ns=”mydb.tm”)
data<-as.data.frame(find_all)
a<-c(data$count,data$count.2,data$count.3,data$count.4)
a
barplot(a)
names(data)