Linkedin/Camus 是一個Hadoop map-reduce job, 專門去Kafka 讀取message, 接著存到hdfs, 要利用Camus來讀取protocol buffer得自己實做decoder, 這篇範例在decoder 只是單純輸出Kafka byte message, 接著sequence file format output 得實做RecordProvider去寫檔到hdfs, 以供後續Elephant-bird+Hive 讀取
required: Hadoop
這邊簡單講一下我的hdfs怎麼建置出來的環境是MacBook Air OSX 10.10.1 + JAVA 1.7
用Homebrew 快速安裝brew install hadoop
hadoop 版本是2.6.0
接著設定hdfs的
(其他平台可參考官方document )
(我這邊是把Hadoop 當作系統上一個Daemon JAVA process在run
(所以是參考Pseudo-Distributed Operation)
在我的mac local來到/usr/local/Cellar/hadoop/2.6.0/libexec/etc/hadoop 路徑下
編輯hdfs-site.xml 來設定namenode & dataneode 路徑
不然預設是存在tmp資料夾之下, 下次重開機就會消失了
我的hdfs-site.xml sample
dfs.replication 1 dfs.namenode.name.dir file:///Users/wilberchao/hdfs/nn dfs.datanode.data.dir file:///Users/wilberchao/hdfs/dn
<value>可以放自己想要的路徑
另外還要修改/etc下的core-site.xml 檔案, 我是照官方教學預設去設定
fs.defaultFS hdfs://localhost:9000
因為是虛擬在local 上run hadoop,
所以執行hadoop的account要有ssh local 權限
先試著打ssh local or ssh 127.0.0.1看看有無辦法登入local
沒有的權限話
先執行指令:
ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
接著先格式化namenode 來符合hdfs格式
在/usr/local/Cellar/hadoop/2.6.0/libexec/bin執行指令:
//Format the filesystem:
hdfs namenode -format
最後就可以到/usr/local/Cellar/hadoop/2.6.0/libexec/sbin下
去啟動hadoop by start-dfs.sh
(得先確定你的環境變數有無JAVA_HOME)
(如果沒有可以去etc下的hadoop-env.sh裡面有一個export JAVA_HOME=xxx,給死路徑)
然後可以到http://localhost:50070/ 看看你的hadoop有無啟動成功
在/usr/local/Cellar/hadoop/2.6.0/libexec/下
可以試著幾個簡單指令去操作hdfs看有無運作成功
bin/hdfs dfs -mkdir /user
bin/hdfs dfs -mkdir /user/<username>
確定好Hadoop 可以用了之後就可以開始著手Camus
Camus
大致講一下Camus有哪些優勢(或者參考github官方詳細說明)- 會幫你管理Kafka consumer offset
- 會幫你把抓下來的data做好partition在hdfs
- 分Topic, date, offset, partition等等
- 每做完一次map-reduce job會有報告統計出該次執行結果
- configurable for 哪些topic讀取, 一次讀多少, 多少歷史資料要讀取
先把camus project抓下來
git clone https://github.com/linkedin/camus
實做protocol buffer decoder
package com.linkedin.camus.etl.kafka.coders;
import com.google.protobuf.CodedOutputStream;
import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.coders.MessageDecoder;
import org.apache.log4j.Logger;
import java.io.*;
import java.util.Properties;
/**
* MessageDecoder class that will convert the payload into a ByteArray object,
* System.currentTimeMillis() will be used to set CamusWrapper's
* timestamp property
* * This MessageDecoder returns a CamusWrapper that works with ByteArray payloads,
*/
public class ByteArrayMessageDecoder extends MessageDecoder<byte[] byte[]> {
private static final Logger log = Logger.getLogger(ByteArrayMessageDecoder.class);
@Override
public void init(Properties props, String topicName) {
this.props = props;
this.topicName = topicName;
}
@Override
public CamusWrapper<byte> decode(byte[] payload) {
return new CamusWrapper<byte>(payload, System.currentTimeMillis());
}
}
這邊簡單講解一下
- 用什麼Decoder 端看你的data在Kafka中是什麼格式, 有text, avro, json的等等, 因為這篇講的是protocol buffer, 即是byte array data
- 因此告訴super class 的MessageDecoder 是byte array ( MessageDecoder<byte[], byte[]> )
- 在override method CamusWrapper<byte[]> 中paylod 就是從Kafka 讀出的protocol buffer byte message, 這邊因為不做任何處理直接以byte array形式存在hdfs, 所以直接用CamusWrapper包起來回傳出去
- CamusWrapper的Timestamp 相關到時候存在hdfs的partition方式, 這邊直接用Camus job 當下執行的時間, 更正確的應該是message出現在Kafka的時間
camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/ByteArrayMessageDecoder.java
實做RecordWriter
RecordWriter interface有提供method 去告知Camus poyload 如何寫到hdfs的mrthod你可以自定義每一個payload的結尾符號, 比如 String “\n” or a (byte)0x0,
更甚者你可以壓縮資料 in hdfs, 指定Compress format ,
這邊範例是 output Sequence file format in hdfs
package com.linkedin.camus.etl.kafka.common;
import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.etl.IEtlKey;
import com.linkedin.camus.etl.RecordWriterProvider;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import java.io.IOException;
/**
* Provides a RecordWriter that uses SequenceFile.Writer to write
* SequenceFiles records to HDFS. Compression settings are controlled via
* the usual hadoop configuration values.
* <p/>
* - mapreduce.output.fileoutputformat.compress - true or false
* - mapreduce.output.fileoutputformat.compress.codec - org.apache.hadoop.io.compress.* (SnappyCodec, etc.)
* - mapreduce.output.fileoutputformat.compress.type - BLOCK or RECORD
*/
public class SequenceFileProtobufRecordWriterProvider implements RecordWriterProvider {
public static final String ETL_OUTPUT_RECORD_DELIMITER = "etl.output.record.delimiter";
public static final String DEFAULT_RECORD_DELIMITER = "";
private static Logger log = Logger.getLogger(SequenceFileProtobufRecordWriterProvider.class);
protected String recordDelimiter = null;
public SequenceFileProtobufRecordWriterProvider(TaskAttemptContext context) {
}
// TODO: Make this configurable somehow.
// To do this, we'd have to make SequenceFileRecordWriterProvider have an
// init(JobContext context) method signature that EtlMultiOutputFormat would always call.
@Override
public String getFilenameExtension() {
return "";
}
@Override
public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(TaskAttemptContext context, String fileName,
CamusWrapper camusWrapper, FileOutputCommitter committer) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
// If recordDelimiter hasn't been initialized, do so now
if (recordDelimiter == null) {
recordDelimiter = conf.get(ETL_OUTPUT_RECORD_DELIMITER, DEFAULT_RECORD_DELIMITER);
}
CompressionCodec compressionCodec = null;
CompressionType compressionType = CompressionType.NONE;
// Determine compression type (BLOCK or RECORD) and compression codec to use.
if (SequenceFileOutputFormat.getCompressOutput(context)) {
compressionType = SequenceFileOutputFormat.getOutputCompressionType(context);
Class<?> codecClass = SequenceFileOutputFormat.getOutputCompressorClass(context, DefaultCodec.class);
// Instantiate the CompressionCodec Class
compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
}
// Get the filename for this RecordWriter.
Path path =
new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context, fileName, getFilenameExtension()));
log.info("Creating new SequenceFile.Writer with compression type " + compressionType + " and compression codec "
+ (compressionCodec != null ? compressionCodec.getClass().getName() : "null"));
final SequenceFile.Writer writer =
SequenceFile.createWriter(path.getFileSystem(conf), conf, path, LongWritable.class, BytesWritable.class,
compressionType, compressionCodec, context);
// Return a new anonymous RecordWriter that uses the
// SequenceFile.Writer to write data to HDFS
return new RecordWriter<IEtlKey, CamusWrapper>() {
@Override
public void write(IEtlKey key, CamusWrapper data) throws IOException, InterruptedException {
writer.append(new LongWritable(key.getTime()), new BytesWritable((byte[]) data.getRecord()));
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
writer.close();
}
};
}
}
比較該注意
建立Sequence file writer
final SequenceFile.Writer writer =
SequenceFile.createWriter(path.getFileSystem(conf), conf, path, LongWritable.class, BytesWritable.class,
compressionType, compressionCodec, context);
指定BytesWritable.class 給writer, 確實存成原生的protocol buffer byte
以及回傳的RecordWriter中實做的
return new RecordWriter<IEtlKey, CamusWrapper>() {
@Override
public void write(IEtlKey key, CamusWrapper data) throws IOException, InterruptedException {
writer.append(new LongWritable(key.getTime()), new BytesWritable((byte[]) data.getRecord()));
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
writer.close();
}
};
直白一點就是一個ByteWritable一個寫, 成為sequence file format
最後這個檔案就放在
camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileProtobufRecordWriterProvider.java
就完成了
最後附一下執行Camus job時要餵的configure file
使其知道用什麼Decoder, RecordWriter, 還有Kafka address, topics to read
config.properties:
然後就可以整個打包成jar 給hadoop執行
在camus project folder 之下執行maven 指令(沒有maven 就brew install maven)
然後去到camus/camus-example/target之下就會有個
camus-example-0.1.0-SNAPSHOT-shaded.jar讓你執行camus job
(也記得先把config.properties放到這裡方便執行)
執行command
等他跑完看到執行報告
就可以去hdfs 下看看folder 有無topic資料
接著就是Elephant-bird + Hive
待續
End
# The job name.
camus.job.name=wilber Camus
# final top-level data output directory, sub-directory will be dynamically created for each topic pulled
etl.destination.path=/camus/topics
# HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files
etl.execution.base.path=/camus/exec
# where completed Camus job output directories are kept, usually a sub-dir in the base.path
etl.execution.history.path=/camus/exec/history
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.ByteArrayMessageDecoder
# The record writer for Hadoop
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.SequenceFileProtobufRecordWriterProvider
# max hadoop tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=10
# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=1
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages (-1 means no limit)
kafka.max.pull.minutes.per.task=-1
# if whitelist has values, only whitelisted topic are pulled. Nothing on the blacklist is pulled
kafka.blacklist.topics=
kafka.whitelist.topics=wilber_test_2_addressbook
log4j.configuration=false
# Name of the client as seen by kafka
kafka.client.name=camus
# The Kafka brokers to connect to, format: kafka.brokers=host1:port,host2:port,host3:port
kafka.brokers=localhost:9092
#Stops the mapper from getting inundated with Decoder exceptions for the same topic
#Default value is set to 10
max.decoder.exceptions.to.print=5
#Controls the submitting of counts to Kafka
#Default value set to true
post.tracking.counts.to.kafka=false
#monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka
# everything below this point can be ignored for the time being, will provide more documentation down the road
##########################
etl.run.tracking.post=false
kafka.monitor.tier=
etl.counts.path=
kafka.monitor.time.granularity=10
#etl.hourly=hourly
etl.daily=daily
# Should we ignore events that cannot be decoded (exception thrown by MessageDecoder)?
# `false` will fail the job, `true` will silently drop the event.
etl.ignore.schema.errors=false
# configure output compression for deflate or snappy. Defaults to deflate
mapred.output.compress=false
etl.output.codec=gzip
etl.deflate.level=6
#etl.output.codec=snappy
etl.default.timezone=America/Los_Angeles
etl.output.file.time.partition.mins=60
etl.keep.count.files=false
etl.execution.history.max.of.quota=.8
mapred.map.max.attempts=1
kafka.client.buffer.size=20971520
kafka.client.so.timeout=60000
然後就可以整個打包成jar 給hadoop執行
在camus project folder 之下執行maven 指令(沒有maven 就brew install maven)
mvn package -DskipTests
然後去到camus/camus-example/target之下就會有個
camus-example-0.1.0-SNAPSHOT-shaded.jar讓你執行camus job
(也記得先把config.properties放到這裡方便執行)
執行command
hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P config.properties
等他跑完看到執行報告
就可以去hdfs 下看看folder 有無topic資料
接著就是Elephant-bird + Hive
待續
End
沒有留言:
張貼留言