2015年1月10日 星期六

Camus example for Protocol buffer & sequence file format in hdfs

前言
Linkedin/Camus 是一個Hadoop map-reduce job, 專門去Kafka 讀取message, 接著存到hdfs, 要利用Camus來讀取protocol buffer得自己實做decoder, 這篇範例在decoder 只是單純輸出Kafka byte message, 接著sequence file format output 得實做RecordProvider去寫檔到hdfs, 以供後續Elephant-bird+Hive 讀取 


required: Hadoop

這邊簡單講一下我的hdfs怎麼建置出來的
環境是MacBook Air OSX 10.10.1 + JAVA 1.7
Homebrew 快速安裝brew install hadoop
hadoop 版本是2.6.0

接著設定hdfs的
(其他平台可參考官方document )
(我這邊是把Hadoop 當作系統上一個Daemon JAVA process在run
(所以是參考Pseudo-Distributed Operation)

在我的mac local來到/usr/local/Cellar/hadoop/2.6.0/libexec/etc/hadoop 路徑下
編輯hdfs-site.xml 來設定namenode & dataneode 路徑
不然預設是存在tmp資料夾之下, 下次重開機就會消失了
我的hdfs-site.xml sample

    
        dfs.replication
        1
    
    
       dfs.namenode.name.dir
       file:///Users/wilberchao/hdfs/nn
   
   
      dfs.datanode.data.dir
      file:///Users/wilberchao/hdfs/dn
  


<value>可以放自己想要的路徑
另外還要修改/etc下的core-site.xml 檔案, 我是照官方教學預設去設定

    
        fs.defaultFS
        hdfs://localhost:9000
    


因為是虛擬在local 上run hadoop,
所以執行hadoop的account要有ssh local 權限
先試著打ssh local or ssh 127.0.0.1看看有無辦法登入local
沒有的權限話
先執行指令:
 ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa  
 cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys  

接著先格式化namenode 來符合hdfs格式
在/usr/local/Cellar/hadoop/2.6.0/libexec/bin執行指令:
 //Format the filesystem:  
 hdfs namenode -format  

最後就可以到/usr/local/Cellar/hadoop/2.6.0/libexec/sbin下
去啟動hadoop by start-dfs.sh
(得先確定你的環境變數有無JAVA_HOME)
(如果沒有可以去etc下的hadoop-env.sh裡面有一個export JAVA_HOME=xxx,給死路徑)
然後可以到http://localhost:50070/ 看看你的hadoop有無啟動成功

在/usr/local/Cellar/hadoop/2.6.0/libexec/下
可以試著幾個簡單指令去操作hdfs看有無運作成功
 bin/hdfs dfs -mkdir /user  
 bin/hdfs dfs -mkdir /user/<username>  


確定好Hadoop 可以用了之後就可以開始著手Camus

Camus

大致講一下Camus有哪些優勢(或者參考github官方詳細說明)

  • 會幫你管理Kafka consumer offset
  • 會幫你把抓下來的data做好partition在hdfs
    • 分Topic, date, offset, partition等等
  • 每做完一次map-reduce job會有報告統計出該次執行結果
  • configurable for 哪些topic讀取, 一次讀多少, 多少歷史資料要讀取


先把camus project抓下來
 git clone https://github.com/linkedin/camus  

實做protocol buffer decoder

 package com.linkedin.camus.etl.kafka.coders;  
 import com.google.protobuf.CodedOutputStream;  
 import com.linkedin.camus.coders.CamusWrapper;  
 import com.linkedin.camus.coders.MessageDecoder;  
 import org.apache.log4j.Logger;  
 import java.io.*;  
 import java.util.Properties;  
 /**  
  * MessageDecoder class that will convert the payload into a ByteArray object,  
  * System.currentTimeMillis() will be used to set CamusWrapper's  
  * timestamp property  
  * * This MessageDecoder returns a CamusWrapper that works with ByteArray payloads,  
  */  
 public class ByteArrayMessageDecoder extends MessageDecoder<byte[] byte[]> {  
   private static final Logger log = Logger.getLogger(ByteArrayMessageDecoder.class);  
   @Override  
   public void init(Properties props, String topicName) {  
     this.props = props;  
     this.topicName = topicName;  
   }  
   @Override  
   public CamusWrapper<byte> decode(byte[] payload) {  
     return new CamusWrapper<byte>(payload, System.currentTimeMillis());  
   }  
 }  

這邊簡單講解一下

  • 用什麼Decoder 端看你的data在Kafka中是什麼格式, 有text, avro, json的等等, 因為這篇講的是protocol buffer, 即是byte array data
  • 因此告訴super class 的MessageDecoder 是byte array ( MessageDecoder<byte[], byte[]> )
  • 在override method CamusWrapper<byte[]> 中paylod 就是從Kafka 讀出的protocol buffer byte message, 這邊因為不做任何處理直接以byte array形式存在hdfs, 所以直接用CamusWrapper包起來回傳出去
  • CamusWrapper的Timestamp 相關到時候存在hdfs的partition方式, 這邊直接用Camus job 當下執行的時間, 更正確的應該是message出現在Kafka的時間
這個就放在Camus project 中的
camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/ByteArrayMessageDecoder.java

實做RecordWriter

RecordWriter interface有提供method 去告知Camus poyload 如何寫到hdfs的mrthod
你可以自定義每一個payload的結尾符號, 比如 String “\n” or a (byte)0x0,
更甚者你可以壓縮資料 in hdfs, 指定Compress format ,

這邊範例是 output Sequence file format in hdfs
 package com.linkedin.camus.etl.kafka.common;  
 import com.linkedin.camus.coders.CamusWrapper;  
 import com.linkedin.camus.etl.IEtlKey;  
 import com.linkedin.camus.etl.RecordWriterProvider;  
 import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;  
 import org.apache.hadoop.conf.Configuration;  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.io.BytesWritable;  
 import org.apache.hadoop.io.LongWritable;  
 import org.apache.hadoop.io.SequenceFile;  
 import org.apache.hadoop.io.SequenceFile.CompressionType;  
 import org.apache.hadoop.io.Text;  
 import org.apache.hadoop.io.compress.CompressionCodec;  
 import org.apache.hadoop.io.compress.DefaultCodec;  
 import org.apache.hadoop.mapreduce.RecordWriter;  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;  
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;  
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;  
 import org.apache.hadoop.util.ReflectionUtils;  
 import org.apache.log4j.Logger;  
 import java.io.IOException;  
 /**  
  * Provides a RecordWriter that uses SequenceFile.Writer to write  
  * SequenceFiles records to HDFS. Compression settings are controlled via  
  * the usual hadoop configuration values.  
  * <p/>  
  * - mapreduce.output.fileoutputformat.compress     - true or false  
  * - mapreduce.output.fileoutputformat.compress.codec  - org.apache.hadoop.io.compress.* (SnappyCodec, etc.)  
  * - mapreduce.output.fileoutputformat.compress.type  - BLOCK or RECORD  
  */  
 public class SequenceFileProtobufRecordWriterProvider implements RecordWriterProvider {  
   public static final String ETL_OUTPUT_RECORD_DELIMITER = "etl.output.record.delimiter";  
   public static final String DEFAULT_RECORD_DELIMITER = "";  
   private static Logger log = Logger.getLogger(SequenceFileProtobufRecordWriterProvider.class);  
   protected String recordDelimiter = null;  
   public SequenceFileProtobufRecordWriterProvider(TaskAttemptContext context) {  
   }  
   // TODO: Make this configurable somehow.  
   // To do this, we'd have to make SequenceFileRecordWriterProvider have an  
   // init(JobContext context) method signature that EtlMultiOutputFormat would always call.  
   @Override  
   public String getFilenameExtension() {  
     return "";  
   }  
   @Override  
   public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(TaskAttemptContext context, String fileName,  
                                   CamusWrapper camusWrapper, FileOutputCommitter committer) throws IOException, InterruptedException {  
     Configuration conf = context.getConfiguration();  
     // If recordDelimiter hasn't been initialized, do so now  
     if (recordDelimiter == null) {  
       recordDelimiter = conf.get(ETL_OUTPUT_RECORD_DELIMITER, DEFAULT_RECORD_DELIMITER);  
     }  
     CompressionCodec compressionCodec = null;  
     CompressionType compressionType = CompressionType.NONE;  
     // Determine compression type (BLOCK or RECORD) and compression codec to use.  
     if (SequenceFileOutputFormat.getCompressOutput(context)) {  
       compressionType = SequenceFileOutputFormat.getOutputCompressionType(context);  
       Class<?> codecClass = SequenceFileOutputFormat.getOutputCompressorClass(context, DefaultCodec.class);  
       // Instantiate the CompressionCodec Class  
       compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  
     }  
     // Get the filename for this RecordWriter.  
     Path path =  
         new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context, fileName, getFilenameExtension()));  
     log.info("Creating new SequenceFile.Writer with compression type " + compressionType + " and compression codec "  
         + (compressionCodec != null ? compressionCodec.getClass().getName() : "null"));  
     final SequenceFile.Writer writer =  
         SequenceFile.createWriter(path.getFileSystem(conf), conf, path, LongWritable.class, BytesWritable.class,  
             compressionType, compressionCodec, context);  
     // Return a new anonymous RecordWriter that uses the  
     // SequenceFile.Writer to write data to HDFS  
     return new RecordWriter<IEtlKey, CamusWrapper>() {  
       @Override  
       public void write(IEtlKey key, CamusWrapper data) throws IOException, InterruptedException {    
         writer.append(new LongWritable(key.getTime()), new BytesWritable((byte[]) data.getRecord()));  
       }  
       @Override  
       public void close(TaskAttemptContext context) throws IOException, InterruptedException {  
         writer.close();  
       }  
     };  
   }  
 }  


比較該注意
建立Sequence file writer
 final SequenceFile.Writer writer =  
         SequenceFile.createWriter(path.getFileSystem(conf), conf, path, LongWritable.class, BytesWritable.class,  
             compressionType, compressionCodec, context);  

指定BytesWritable.class 給writer, 確實存成原生的protocol buffer byte

以及回傳的RecordWriter中實做的
 return new RecordWriter<IEtlKey, CamusWrapper>() {  
       @Override  
       public void write(IEtlKey key, CamusWrapper data) throws IOException, InterruptedException {  
         writer.append(new LongWritable(key.getTime()), new BytesWritable((byte[]) data.getRecord()));  
       }  
       @Override  
       public void close(TaskAttemptContext context) throws IOException, InterruptedException {  
         writer.close();  
       }  
     };  

直白一點就是一個ByteWritable一個寫, 成為sequence file format

最後這個檔案就放在
camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileProtobufRecordWriterProvider.java
就完成了

最後附一下執行Camus job時要餵的configure file 
使其知道用什麼Decoder, RecordWriter, 還有Kafka address, topics to read

config.properties:
 # The job name.  
 camus.job.name=wilber Camus  
 # final top-level data output directory, sub-directory will be dynamically created for each topic pulled  
 etl.destination.path=/camus/topics  
 # HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files  
 etl.execution.base.path=/camus/exec  
 # where completed Camus job output directories are kept, usually a sub-dir in the base.path  
 etl.execution.history.path=/camus/exec/history  
 camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.ByteArrayMessageDecoder  
 # The record writer for Hadoop  
 etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.SequenceFileProtobufRecordWriterProvider  
 # max hadoop tasks to use, each task can pull multiple topic partitions  
 mapred.map.tasks=10  
 # max historical time that will be pulled from each partition based on event timestamp  
 kafka.max.pull.hrs=1  
 # events with a timestamp older than this will be discarded.  
 kafka.max.historical.days=3  
 # Max minutes for each mapper to pull messages (-1 means no limit)  
 kafka.max.pull.minutes.per.task=-1  
 # if whitelist has values, only whitelisted topic are pulled. Nothing on the blacklist is pulled  
 kafka.blacklist.topics=  
 kafka.whitelist.topics=wilber_test_2_addressbook  
 log4j.configuration=false  
 # Name of the client as seen by kafka  
 kafka.client.name=camus  
 # The Kafka brokers to connect to, format: kafka.brokers=host1:port,host2:port,host3:port  
 kafka.brokers=localhost:9092  
 #Stops the mapper from getting inundated with Decoder exceptions for the same topic  
 #Default value is set to 10  
 max.decoder.exceptions.to.print=5  
 #Controls the submitting of counts to Kafka  
 #Default value set to true  
 post.tracking.counts.to.kafka=false  
 #monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka  
 # everything below this point can be ignored for the time being, will provide more documentation down the road  
 ##########################  
 etl.run.tracking.post=false  
 kafka.monitor.tier=  
 etl.counts.path=  
 kafka.monitor.time.granularity=10  
 #etl.hourly=hourly  
 etl.daily=daily  
 # Should we ignore events that cannot be decoded (exception thrown by MessageDecoder)?  
 # `false` will fail the job, `true` will silently drop the event.  
 etl.ignore.schema.errors=false  
 # configure output compression for deflate or snappy. Defaults to deflate  
 mapred.output.compress=false  
 etl.output.codec=gzip  
 etl.deflate.level=6  
 #etl.output.codec=snappy  
 etl.default.timezone=America/Los_Angeles  
 etl.output.file.time.partition.mins=60  
 etl.keep.count.files=false  
 etl.execution.history.max.of.quota=.8  
 mapred.map.max.attempts=1  
 kafka.client.buffer.size=20971520  
 kafka.client.so.timeout=60000  

然後就可以整個打包成jar 給hadoop執行

在camus project folder 之下執行maven 指令(沒有maven 就brew install maven)
 mvn package -DskipTests  

然後去到camus/camus-example/target之下就會有個
camus-example-0.1.0-SNAPSHOT-shaded.jar讓你執行camus job
(也記得先把config.properties放到這裡方便執行)

執行command
 hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P config.properties  

等他跑完看到執行報告
就可以去hdfs 下看看folder 有無topic資料

接著就是Elephant-bird + Hive
待續

End


沒有留言:

張貼留言