博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapreduce深入剖析5大视频
阅读量:5101 次
发布时间:2019-06-13

本文共 20804 字,大约阅读时间需要 69 分钟。

 

 

 

 

 

 

 

 

 

 

参考代码

TVPlayCount.java

package com.dajiangtai.hadoop.tvplay;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.sun.org.apache.bcel.internal.generic.NEW;public class TVPlayCount extends Configured implements Tool{        public static class TVPlayMapper extends Mapper
{ @Override protected void map(Text key, TVPlayData value,Context context) throws IOException, InterruptedException { context.write(key, value); } } public static class TVPlayReducer extends Reducer
{ private Text m_key=new Text(); private Text m_value = new Text(); private MultipleOutputs
mos; //将多路输出打开 protected void setup(Context context) throws IOException,InterruptedException { mos = new MultipleOutputs
(context); } protected void reduce (Text Key,Iterable
Values, Context context) throws IOException, InterruptedException{ int daynumber = 0; int collectnumber = 0; int commentnumber = 0; int againstnumber = 0; int supportnumber = 0; for (TVPlayData tv : Values){ daynumber+=tv.getDaynumber(); collectnumber+=tv.getCollectnumber(); commentnumber += tv.getCommentnumber(); againstnumber += tv.getAgainstnumber(); supportnumber += tv.getSupportnumber(); } String[] records=Key.toString().split("\t"); // 1优酷 2搜狐 3 土豆 4爱奇艺 5迅雷看看 String source =records[1]; // 媒体类别 m_key.set(records[0]); m_value.set(daynumber+"\t"+collectnumber+"\t" +commentnumber+"\t"+againstnumber+"\t"+supportnumber); if(source.equals("1")){ mos.write("youku", m_key, m_value); }else if (source.equals("2")) { mos.write("souhu", m_key, m_value); } else if (source.equals("3")) { mos.write("tudou", m_key, m_value); } else if (source.equals("4")) { mos.write("aiqiyi", m_key, m_value); } else if (source.equals("5")) { mos.write("xunlei", m_key, m_value); } } //关闭 MultipleOutputs,也就是关闭 RecordWriter,并且是一堆 RecordWriter,因为这里会有很多 reduce 被调用。 protected void cleanup( Context context) throws IOException,InterruptedException { mos.close(); }} @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); // 配置文件对象 Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf);// 创建输出路径 if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = new Job(conf, "tvplay");// 构造任务 job.setJarByClass(TVPlayCount.class);// 设置主类 job.setMapperClass(TVPlayMapper.class);// 设置Mapper job.setMapOutputKeyClass(Text.class);// key输出类型 job.setMapOutputValueClass(TVPlayData.class);// value输出类型 job.setInputFormatClass(TVPlayInputFormat.class);//自定义输入格式 job.setReducerClass(TVPlayReducer.class);// 设置Reducer job.setOutputKeyClass(Text.class);// reduce key类型 job.setOutputValueClass(Text.class);// reduce value类型 // 自定义文件输出格式,通过路径名(pathname)来指定输出路径 MultipleOutputs.addNamedOutput(job, "youku", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "souhu", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "tudou", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "aiqiyi", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "xunlei", TextOutputFormat.class, Text.class, Text.class); FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径 job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception{ String[] args0={
"hdfs://master:9000/tvplay/", "hdfs://master:9000/tvplay/out"}; int ec = ToolRunner.run(new Configuration(), new TVPlayCount(), args0); System.exit(ec); }}

 

 

TVPlayData.java

package com.dajiangtai.hadoop.tvplay;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/** *  * @author yangjun * @function 自定义对象 */public class TVPlayData implements WritableComparable{    private int daynumber;    private int collectnumber;    private int commentnumber;    private int againstnumber;    private int supportnumber;    public TVPlayData(){}    public void set(int daynumber,int collectnumber,int commentnumber,int againstnumber,int supportnumber){        this.daynumber = daynumber;        this.collectnumber = collectnumber;        this.commentnumber = commentnumber;        this.againstnumber = againstnumber;        this.supportnumber = supportnumber;    }    public int getDaynumber() {        return daynumber;    }    public void setDaynumber(int daynumber) {        this.daynumber = daynumber;    }    public int getCollectnumber() {        return collectnumber;    }    public void setCollectnumber(int collectnumber) {        this.collectnumber = collectnumber;    }    public int getCommentnumber() {        return commentnumber;    }    public void setCommentnumber(int commentnumber) {        this.commentnumber = commentnumber;    }    public int getAgainstnumber() {        return againstnumber;    }    public void setAgainstnumber(int againstnumber) {        this.againstnumber = againstnumber;    }    public int getSupportnumber() {        return supportnumber;    }    public void setSupportnumber(int supportnumber) {        this.supportnumber = supportnumber;    }    @Override    public void readFields(DataInput in) throws IOException {        daynumber = in.readInt();        collectnumber = in.readInt();        commentnumber = in.readInt();        againstnumber = in.readInt();        supportnumber = in.readInt();    }    @Override    public void write(DataOutput out) throws IOException {        out.writeInt(daynumber);        out.writeInt(collectnumber);        out.writeInt(commentnumber);        out.writeInt(againstnumber);        out.writeInt(supportnumber);    }    @Override    public int compareTo(Object o) {        return 0;    };}

 

 

TVPlayInputFormat.java

package com.dajiangtai.hadoop.tvplay;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.util.LineReader;/** *  * @author yangjun * @function key vlaue 输入格式 */public class TVPlayInputFormat extends FileInputFormat
{ @Override public RecordReader
createRecordReader(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { return new TVPlayRecordReader(); } public class TVPlayRecordReader extends RecordReader
{ public LineReader in; public Text lineKey; public TVPlayData lineValue; public Text line; @Override public void close() throws IOException { if(in !=null){ in.close(); } } @Override public Text getCurrentKey() throws IOException, InterruptedException { return lineKey; } @Override public TVPlayData getCurrentValue() throws IOException, InterruptedException { return lineValue; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit split=(FileSplit)input; Configuration job=context.getConfiguration(); Path file=split.getPath(); FileSystem fs=file.getFileSystem(job); FSDataInputStream filein=fs.open(file); in=new LineReader(filein,job); line=new Text(); lineKey=new Text(); lineValue = new TVPlayData(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { int linesize=in.readLine(line); if(linesize==0) return false; String[] pieces = line.toString().split("\t"); if(pieces.length != 7){ throw new IOException("Invalid record received"); } lineKey.set(pieces[0]+"\t"+pieces[1]); lineValue.set(Integer.parseInt(pieces[2]),Integer.parseInt(pieces[3]),Integer.parseInt(pieces[4]) ,Integer.parseInt(pieces[5]),Integer.parseInt(pieces[6])); return true; } }}

 

 先启动3节点集群

 

与自己在本地搭建的3节点集群的hdfs连接上

 

 在终端显示的运行结果,程序没有错误

 

2017-10-11 16:04:55,893 INFO [org.apache.hadoop.conf.Configuration.deprecation] - session.id is deprecated. Instead, use dfs.metrics.session-id2017-10-11 16:04:55,899 INFO [org.apache.hadoop.metrics.jvm.JvmMetrics] - Initializing JVM Metrics with processName=JobTracker, sessionId=2017-10-11 16:04:56,987 WARN [org.apache.hadoop.mapreduce.JobSubmitter] - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.2017-10-11 16:04:56,993 WARN [org.apache.hadoop.mapreduce.JobSubmitter] - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).2017-10-11 16:04:57,229 INFO [org.apache.hadoop.mapreduce.lib.input.FileInputFormat] - Total input paths to process : 12017-10-11 16:04:57,354 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - number of splits:12017-10-11 16:04:57,426 INFO [org.apache.hadoop.conf.Configuration.deprecation] - user.name is deprecated. Instead, use mapreduce.job.user.name2017-10-11 16:04:57,428 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class2017-10-11 16:04:57,429 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.mapoutput.value.class is deprecated. Instead, use mapreduce.map.output.value.class2017-10-11 16:04:57,430 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapreduce.map.class is deprecated. Instead, use mapreduce.job.map.class2017-10-11 16:04:57,430 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.job.name is deprecated. Instead, use mapreduce.job.name2017-10-11 16:04:57,430 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapreduce.reduce.class is deprecated. Instead, use mapreduce.job.reduce.class2017-10-11 16:04:57,431 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapreduce.inputformat.class is deprecated. Instead, use mapreduce.job.inputformat.class2017-10-11 16:04:57,431 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir2017-10-11 16:04:57,431 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir2017-10-11 16:04:57,432 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps2017-10-11 16:04:57,433 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class2017-10-11 16:04:57,434 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.mapoutput.key.class is deprecated. Instead, use mapreduce.map.output.key.class2017-10-11 16:04:57,434 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir2017-10-11 16:04:58,164 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - Submitting tokens for job: job_local300699497_00012017-10-11 16:04:58,336 WARN [org.apache.hadoop.conf.Configuration] - file:/tmp/hadoop-Administrator/mapred/staging/Administrator300699497/.staging/job_local300699497_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.2017-10-11 16:04:58,337 WARN [org.apache.hadoop.conf.Configuration] - file:/tmp/hadoop-Administrator/mapred/staging/Administrator300699497/.staging/job_local300699497_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.2017-10-11 16:04:58,864 WARN [org.apache.hadoop.conf.Configuration] - file:/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/job_local300699497_0001/job_local300699497_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.2017-10-11 16:04:58,865 WARN [org.apache.hadoop.conf.Configuration] - file:/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/job_local300699497_0001/job_local300699497_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.2017-10-11 16:04:58,904 INFO [org.apache.hadoop.mapreduce.Job] - The url to track the job: http://localhost:8080/2017-10-11 16:04:58,906 INFO [org.apache.hadoop.mapreduce.Job] - Running job: job_local300699497_00012017-10-11 16:04:58,953 INFO [org.apache.hadoop.mapred.LocalJobRunner] - OutputCommitter set in config null2017-10-11 16:04:58,984 INFO [org.apache.hadoop.mapred.LocalJobRunner] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter2017-10-11 16:04:59,233 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Waiting for map tasks2017-10-11 16:04:59,234 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Starting task: attempt_local300699497_0001_m_000000_02017-10-11 16:04:59,451 INFO [org.apache.hadoop.yarn.util.ProcfsBasedProcessTree] - ProcfsBasedProcessTree currently is supported only on Linux.2017-10-11 16:04:59,900 INFO [org.apache.hadoop.mapred.Task] -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@1b9156ad2017-10-11 16:04:59,908 INFO [org.apache.hadoop.mapred.MapTask] - Processing split: hdfs://master:9000/tvplay/tvplay.txt:0+108339232017-10-11 16:04:59,910 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local300699497_0001 running in uber mode : false2017-10-11 16:04:59,952 INFO [org.apache.hadoop.mapreduce.Job] -  map 0% reduce 0%2017-10-11 16:04:59,987 INFO [org.apache.hadoop.mapred.MapTask] - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer2017-10-11 16:05:00,170 INFO [org.apache.hadoop.mapred.MapTask] - (EQUATOR) 0 kvi 26214396(104857584)2017-10-11 16:05:00,170 INFO [org.apache.hadoop.mapred.MapTask] - mapreduce.task.io.sort.mb: 1002017-10-11 16:05:00,170 INFO [org.apache.hadoop.mapred.MapTask] - soft limit at 838860802017-10-11 16:05:00,170 INFO [org.apache.hadoop.mapred.MapTask] - bufstart = 0; bufvoid = 1048576002017-10-11 16:05:00,170 INFO [org.apache.hadoop.mapred.MapTask] - kvstart = 26214396; length = 65536002017-10-11 16:05:03,511 INFO [org.apache.hadoop.mapred.LocalJobRunner] - 2017-10-11 16:05:03,545 INFO [org.apache.hadoop.mapred.MapTask] - Starting flush of map output2017-10-11 16:05:03,545 INFO [org.apache.hadoop.mapred.MapTask] - Spilling map output2017-10-11 16:05:03,545 INFO [org.apache.hadoop.mapred.MapTask] - bufstart = 0; bufend = 12652147; bufvoid = 1048576002017-10-11 16:05:03,545 INFO [org.apache.hadoop.mapred.MapTask] - kvstart = 26214396(104857584); kvend = 24882940(99531760); length = 1331457/65536002017-10-11 16:05:04,913 INFO [org.apache.hadoop.mapred.MapTask] - Finished spill 02017-10-11 16:05:04,924 INFO [org.apache.hadoop.mapred.Task] - Task:attempt_local300699497_0001_m_000000_0 is done. And is in the process of committing2017-10-11 16:05:04,998 INFO [org.apache.hadoop.mapred.LocalJobRunner] - map2017-10-11 16:05:04,998 INFO [org.apache.hadoop.mapred.Task] - Task 'attempt_local300699497_0001_m_000000_0' done.2017-10-11 16:05:04,998 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Finishing task: attempt_local300699497_0001_m_000000_02017-10-11 16:05:04,999 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Map task executor complete.2017-10-11 16:05:05,047 INFO [org.apache.hadoop.yarn.util.ProcfsBasedProcessTree] - ProcfsBasedProcessTree currently is supported only on Linux.2017-10-11 16:05:05,366 INFO [org.apache.hadoop.mapred.Task] -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@fba110e2017-10-11 16:05:05,417 INFO [org.apache.hadoop.mapred.Merger] - Merging 1 sorted segments2017-10-11 16:05:05,484 INFO [org.apache.hadoop.mapred.Merger] - Down to the last merge-pass, with 1 segments left of total size: 13317874 bytes2017-10-11 16:05:05,485 INFO [org.apache.hadoop.mapred.LocalJobRunner] - 2017-10-11 16:05:05,578 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords2017-10-11 16:05:05,978 INFO [org.apache.hadoop.mapreduce.Job] -  map 100% reduce 0%2017-10-11 16:05:07,669 INFO [org.apache.hadoop.mapred.Task] - Task:attempt_local300699497_0001_r_000000_0 is done. And is in the process of committing2017-10-11 16:05:07,675 INFO [org.apache.hadoop.mapred.LocalJobRunner] - 2017-10-11 16:05:07,675 INFO [org.apache.hadoop.mapred.Task] - Task attempt_local300699497_0001_r_000000_0 is allowed to commit now2017-10-11 16:05:07,716 INFO [org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter] - Saved output of task 'attempt_local300699497_0001_r_000000_0' to hdfs://master:9000/tvplay/out/_temporary/0/task_local300699497_0001_r_0000002017-10-11 16:05:07,717 INFO [org.apache.hadoop.mapred.LocalJobRunner] - reduce > reduce2017-10-11 16:05:07,717 INFO [org.apache.hadoop.mapred.Task] - Task 'attempt_local300699497_0001_r_000000_0' done.2017-10-11 16:05:07,978 INFO [org.apache.hadoop.mapreduce.Job] -  map 100% reduce 100%2017-10-11 16:05:07,979 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local300699497_0001 completed successfully2017-10-11 16:05:08,015 INFO [org.apache.hadoop.mapreduce.Job] - Counters: 32    File System Counters        FILE: Number of bytes read=13318207        FILE: Number of bytes written=27040248        FILE: Number of read operations=0        FILE: Number of large read operations=0        FILE: Number of write operations=0        HDFS: Number of bytes read=21667846        HDFS: Number of bytes written=195234        HDFS: Number of read operations=17        HDFS: Number of large read operations=0        HDFS: Number of write operations=11    Map-Reduce Framework        Map input records=332865        Map output records=332865        Map output bytes=12652147        Map output materialized bytes=13317883        Input split bytes=101        Combine input records=0        Combine output records=0        Reduce input groups=5741        Reduce shuffle bytes=0        Reduce input records=332865        Reduce output records=0        Spilled Records=665730        Shuffled Maps =0        Failed Shuffles=0        Merged Map outputs=0        GC time elapsed (ms)=74        CPU time spent (ms)=0        Physical memory (bytes) snapshot=0        Virtual memory (bytes) snapshot=0        Total committed heap usage (bytes)=705691648    File Input Format Counters         Bytes Read=10833923    File Output Format Counters         Bytes Written=0

 

 查看hdfs上的输出结果

 

 

 

转载于:https://www.cnblogs.com/braveym/p/7643551.html

你可能感兴趣的文章
Hmailserver搭建邮件服务器
查看>>
django之多表查询-2
查看>>
快速幂
查看>>
改善C#公共程序类库质量的10种方法
查看>>
AIO 开始不定时的抛异常: java.io.IOException: 指定的网络名不再可用
查看>>
MyBaits动态sql语句
查看>>
HDU4405(期望DP)
查看>>
拉格朗日乘子法 那些年学过的高数
查看>>
vs code 的便捷使用
查看>>
Spring MVC @ResponseBody返回中文字符串乱码问题
查看>>
用户空间与内核空间,进程上下文与中断上下文[总结]
查看>>
JS 中的跨域请求
查看>>
JAVA开发环境搭建
查看>>
mysql基础语句
查看>>
Oracle中的rownum不能使用大于>的问题
查看>>
[Data Structure & Algorithm] 有向无环图的拓扑排序及关键路径
查看>>
cassandra vs mongo (1)存储引擎
查看>>
Visual Studio基于CMake配置opencv1.0.0、opencv2.2
查看>>
Vue音乐项目笔记(三)
查看>>
遍历Map对象
查看>>