Flink 생 성 Hfile
팀 은 기술 창 고 를 통일 하기 위해 Flink 로 일괄 처리 와 흐름 계산 을 통일 적 으로 처리 하 는 것 에 일치 동의 했다.
질문 이 왔 습 니 다.
Flink 는 spark 에 비해 완선 되 지 않 은 것 같 습 니 다. spark 가 분 단위 로 해결 하 는 일 은 Flink 에서 머리 를 써 야 합 니 다.
spark 참조 하기;
object CreateHfile {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CreateHfile").setMaster(args(0))
val sc = new SparkContext(conf)
val hbaseConf = HBaseConfiguration.create()
//
val rdd = sc.textFile(args(1))
.flatMap(v =>{
val x = new javaList[String]()
for( a v.toString)
.map(r =>(new ImmutableBytesWritable(Bytes.toBytes(r.toString)),
new KeyValue(Bytes.toBytes(r.toString), Bytes.toBytes("phoneFamliy"), Bytes.toBytes("phoneCol"),System.currentTimeMillis(),KeyValue.Type.DeleteColumn)))
rdd.saveAsNewAPIHadoopFile(args(2), classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat2], hbaseConf)
sc.stop()
}
}
MR 다시 보 겠 습 니 다.
public class HFileCreateJob {
private final static Logger log = LoggerFactory.getLogger(HFileCreateJob.class);
public void run(String input,String output,String env) throws Exception {
Configuration conf = new Configuration();
if("dev".equals(env)){
devHeader(conf) ;
}
try {
// ,
try {
FileSystem fs = FileSystem.get(URI.create(output), conf);
fs.delete(new Path(output), true);
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
Job job = Job.getInstance(conf, "HFileCreateJob");
job.setJobName("Zhao@HFileCreateJob_V1.0");
job.setJarByClass(HFileCreateJob.class);
job.setMapperClass(HfileMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(HfileReducer.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.setOutputFormatClass(HFileOutputFormat2.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (IOException e) {
e.printStackTrace();
}
}
private void devHeader(Configuration conf){
//
conf.set("mapreduce.app-submission.cross-platform", "true");
conf.set("mapreduce.job.ubertask.enable", "true");
conf.set("fs.defaultFS","hdfs://10.10.10.165:8020");
conf.set("mapreduce.job.jar","E:\\intermult-hbase\\target\\intermulthbase-1.0-SNAPSHOT.jar");
// hdfs
conf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
System.setProperty("hadoop.home.dir", "D:\\soft\\developsoft\\Hadoop\\hadoop-2.6.5");
System.setProperty("HADOOP_USER_NAME", "hdfs");
}
public class HfileMapper extends Mapper {
private String rowKeySalt = ConfigFactory.load().getConfig("hfileCreate").getString("rowKeySalt") ;
@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
String[] datas = value.toString().split("\\001");
String content = value.toString().replaceAll("\\001","\\!\\@\\#\\$") ;
Text rowKey = new Text(SHA256Util.getSHA256Str(datas[0] + rowKeySalt )) ;
context.write(rowKey,new Text(content));
}
}
public class HfileReducer extends Reducer {
private final static Logger logger = LoggerFactory.getLogger(HFileCreateJob.class);
private Config env = ConfigFactory.load().getConfig("hfileCreate") ;
private String family = env.getString("family") ;
private String column= env.getString("column") ;
@Override
protected void reduce(Text key, Iterable values, Reducer.Context context)
throws IOException, InterruptedException {
for (Text value : values) {
try{
String line = value.toString();
logger.error("line : " + line);
ImmutableBytesWritable rowkey = new ImmutableBytesWritable(key.toString().getBytes());
KeyValue kv = new KeyValue(key.toString().getBytes(), this.family.getBytes(), column.getBytes() , line.getBytes());
context.write(rowkey, kv);
}catch (Exception e){
logger.error("",e);
e.printStackTrace();
}
}
}
}
마지막 으로 Flink 의 프로젝트 입 니 다.
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
/**
* Flink Hfile
* https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/hadoop_compatibility.html
* Created by geo on 2019/4/8. */
public class Application {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
devHeader(conf);
Job job = Job.getInstance(conf);
// HDFS
HadoopInputFormat hadoopIF =
new HadoopInputFormat(
new TextInputFormat(), LongWritable.class, Text.class, job
);
TextInputFormat.addInputPath(job, new Path("hdfs://2.2.2.2:8020/user/zhao/out0226/testHfile"));
// Flink
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet> textDataSet = env.createInput(hadoopIF);
DataSet> ds = textDataSet.map(v-> Tuple1.of(v.f1.toString()))
.returns(Types.TUPLE(Types.STRING))
.groupBy(0)
.sortGroup(0,Order.ASCENDING)
.reduceGroup(new createHfile());
//
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
// HDFS
HadoopOutputFormat hadoopOF =
new HadoopOutputFormat(
new HFileOutputFormat2(), job
);
HFileOutputFormat2.setOutputPath(job, new Path("hdfs://10.111.32.165:8020/user/zhao/out0226/9/"));
job.setOutputFormatClass(HFileOutputFormat2.class);
ds.output(hadoopOF);
env.execute();
}
// Tuple2
public static final class createHfile extends RichGroupReduceFunction, Tuple2> {
@Override
public void reduce(Iterable> values, Collector> out) throws Exception {
String family="datasfamily";
String column="content";
for (Tuple1 key:values) {
ImmutableBytesWritable rowkey = new ImmutableBytesWritable(key.toString().getBytes());
KeyValue kv = new KeyValue(key.toString().getBytes(), family.getBytes(), column.getBytes() , key.f0.getBytes());
out.collect(Tuple2.of(rowkey,kv));
}
}
}
/**
*
* @param conf Configuration
*/
private static void devHeader(Configuration conf){
//
conf.set("mapreduce.app-submission.cross-platform", "true");
conf.set("mapreduce.job.ubertask.enable", "true");
conf.set("fs.defaultFS","hdfs://2.2.2.2:8020");
// hdfs
conf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
System.setProperty("hadoop.home.dir", "D:\\soft\\developsoft\\Hadoop\\hadoop-2.6.5");
System.setProperty("HADOOP_USER_NAME", "hdfs");
}
}
자, pom 도 던 져.
UTF-8
1.8
1.8
1.1.5
org.apache.kafka
kafka_2.11
1.0.1
org.slf4j
slf4j-log4j12
jmxri
com.sun.jmx
jmxtools
com.sun.jdmk
jms
javax.jms
junit
junit
com.typesafe
config
1.2.1
junit
junit
3.8.1
test
ch.qos.logback
logback-core
${logback.version}
ch.qos.logback
logback-classic
${logback.version}
ch.qos.logback
logback-access
${logback.version}
commons-codec
commons-codec
RELEASE
net.alchim31.maven
scala-maven-plugin
3.2.2
compile
testCompile
org.apache.maven.plugins
maven-shade-plugin
2.4.3
package
shade
com.geotmt.dw.Application
마지막 에 쓰기:
Flink 는 새로운 기술 입 니 다. 그 가 자 라 는 것 을 천천히 지 켜 보 세 요.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.