Hadoop HDFS和MapReduce测试

Hadoop的MapReduce应该是属于核心的编程框架,到目前为止我还不是特别懂,只能按照网上的步骤一点实践,就和学一门新语言第一个程序是HelloWorld一样。前文已经介绍了Hadoop的安装,这里就HDFS和MapReduce的测试。

HDFS测试


在HDFS中新建一个文件夹,并把本地文件copy到HDFS新建的文件夹
cd hadoop路径
bin/hadoop dfs -mkdir /tmp/firefoxbug
bin/hadoop dfs -put /opt/run.log /tmp/firefoxbug
bin/hadoop dfs -ls /tmp/firefoxbug

你可以再 master hadoop的50070端口下浏览文件系统,再进每个节点里面,按照默认的hadoop配置,文件会同步到两个slave下。一个块的大小是64M,如果文件超过64,那么多个块。反之没超过也占用一个块。

MapReduce测试


先是Map,再是Reduce,下面实现的功能就是单词计数,统计/opt/run.log单词出现的次数。
mapper.py就是对单词进行分割,比如文件内容是
hello hadoop,hadoop fine
跑完mapper.py出来的就是
(hello,1)
(hadoop,2)
(fine,1)
之后通过sort就可以排序
#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
	# remove leading and trailing whitespace
	line = line.strip()
	# split the line into words
	words = line.split()
	# increase counters
	for word in words:
		# write the results to STDOUT (standard output);
		# what we output here will be the input for the
		# Reduce step, i.e. the input for reducer.py
		#
		# tab-delimited; the trivial word count is 1
		print '%s\t%s' % (word, 1)

reducer.py的功能就是对map后sort后多台数据的汇总,比如两台出来的结果分别是
(hadoop,1),(hello,2)和(hadoop,2),(fine,1)
那么reduce后的结果就是(hadoop,3),(hello,2),(fine,1)

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
	# remove leading and trailing whitespace
	line = line.strip()

	# parse the input we got from mapper.py
	word, count = line.split('\t', 1)

	# convert count (currently a string) to int
	try:
		count = int(count)
	except ValueError:
		# count was not a number, so silently
		# ignore/discard this line
		continue

	# this IF-switch only works because Hadoop sorts map output
	# by key (here: word) before it is passed to the reducer
	if current_word == word:
		current_count += count
	else:
		if current_word:
			# write result to STDOUT
			print '%s\t%s' % (current_word, current_count)
		current_count = count
		current_word = word

# do not forget to output the last word if needed!
if current_word == word:
	print '%s\t%s' % (current_word, current_count)

下面测试下普通的shell执行的效率,/opt/run.log大概是80M左右。

time cat /opt/run.log | python mapper.py | sort -n | python reducer.py

再用Hadoop的MapReduce来测试,map的个数就是机器的数量,reduce的可以多个,这里指定四个。

bin/hadoop jar ./contrib/streaming/hadoop-streaming-1.1.2.jar  -mapper mapper.py -file mapper.py -reducer reducer.py -file reducer.py -input /tmp/firefoxbug/run.log -output run.log.hadoop -jobconf mapred.reduce.tasks=4

13/11/29 19:32:33 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
packageJobJar: [mapper.py, reducer.py, /tmp/hadoop/hadoop-unjar1011643559835824640/] [] /tmp/streamjob1255129713070443511.jar tmpDir=null
13/11/29 19:32:34 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/11/29 19:32:34 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/29 19:32:34 INFO mapred.FileInputFormat: Total input paths to process : 1
13/11/29 19:32:34 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop/mapred/local]
13/11/29 19:32:34 INFO streaming.StreamJob: Running job: job_201311291850_0003
13/11/29 19:32:34 INFO streaming.StreamJob: To kill this job, run:
13/11/29 19:32:34 INFO streaming.StreamJob: /root/hadoop-1.1.2/libexec/../bin/hadoop job  -Dmapred.job.tracker=master.hadoop:9001 -kill job_201311291850_0003
13/11/29 19:32:34 INFO streaming.StreamJob: Tracking URL: http://master.hadoop:50030/jobdetails.jsp?jobid=job_201311291850_0003
13/11/29 19:32:35 INFO streaming.StreamJob:  map 0%  reduce 0%
13/11/29 19:32:43 INFO streaming.StreamJob:  map 83%  reduce 0%
13/11/29 19:32:46 INFO streaming.StreamJob:  map 92%  reduce 0%
13/11/29 19:32:47 INFO streaming.StreamJob:  map 100%  reduce 0%
13/11/29 19:32:55 INFO streaming.StreamJob:  map 100%  reduce 17%
13/11/29 19:33:04 INFO streaming.StreamJob:  map 100%  reduce 67%
13/11/29 19:33:07 INFO streaming.StreamJob:  map 100%  reduce 72%
13/11/29 19:33:10 INFO streaming.StreamJob:  map 100%  reduce 77%
13/11/29 19:33:13 INFO streaming.StreamJob:  map 100%  reduce 80%
13/11/29 19:33:16 INFO streaming.StreamJob:  map 100%  reduce 89%
13/11/29 19:33:19 INFO streaming.StreamJob:  map 100%  reduce 99%
13/11/29 19:33:22 INFO streaming.StreamJob:  map 100%  reduce 100%
13/11/29 19:33:23 INFO streaming.StreamJob: Job complete: job_201311291850_0003
13/11/29 19:33:23 INFO streaming.StreamJob: Output: run.log.hadoop

两者相比较,前面的普通python执行完的事件大概是1分钟,后面Hadoop的MapReduce的时间是38秒,可以预测要是文件更大话Hadoop的MapReduce必然很强大。输出的结果呢,默认若果没指定全路径,就是在/user/root/run.log.hadoop,输出文件也会被分割part-00000,part-00001这种。上面流程的具体原理还有待研究。

标签:none

评论已关闭