亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

分布式計算開源框架Hadoop入門實踐(三)

系統 2581 0

分布式計算開源框架Hadoop入門實踐(三)

Hadoop基本流程

分布式計算開源框架Hadoop入門實踐(三)

分布式計算開源框架Hadoop入門實踐(三)

一個圖片太大了,只好分割成為兩部分。根據流程圖來說一下具體一個任務執行的情況。

  1. 在分布式環境中客戶端創建任務并提交。
  2. InputFormat做Map前的預處理,主要負責以下工作:
    1. 驗證輸入的格式是否符合JobConfig的輸入定義,這個在實現Map和構建Conf的時候就會知道,不定義可以是Writable的任意子類。
    2. 將input的文件切分為邏輯上的輸入InputSplit,其實這就是在上面提到的在分布式文件系統中blocksize是有大小限制的,因此大文件會被劃分為多個block。
    3. 通過RecordReader來再次處理inputsplit為一組records,輸出給Map。(inputsplit只是邏輯切分的第一步,但是如何根據文件中的信息來切分還需要RecordReader來實現,例如最簡單的默認方式就是回車換行的切分)
  3. RecordReader處理后的結果作為Map的輸入,Map執行定義的Map邏輯,輸出處理后的key和value對應到臨時中間文件。
  4. Combiner可選擇配置,主要作用是在每一個Map執行完分析以后,在本地優先作Reduce的工作,減少在Reduce過程中的數據傳輸量。
  5. Partitioner可選擇配置,主要作用是在多個Reduce的情況下,指定Map的結果由某一個Reduce處理,每一個Reduce都會有單獨的輸出文件。(后面的代碼實例中有介紹使用場景)
  6. Reduce執行具體的業務邏輯,并且將處理結果輸出給OutputFormat。
  7. OutputFormat的職責是,驗證輸出目錄是否已經存在,同時驗證輸出結果類型是否如Config中配置,最后輸出Reduce匯總后的結果。

業務場景和代碼范例

業務場景描述: 可設定輸入和輸出路徑(操作系統的路徑非HDFS路徑),根據訪問日志分析某一個應用訪問某一個API的總次數和總流量,統計后分別輸出到兩個文件中。這里僅僅為了測試,沒有去細分很多類,將所有的類都歸并于一個類便于說明問題。

分布式計算開源框架Hadoop入門實踐(三)
測試代碼類圖

LogAnalysiser就是主類,主要負責創建、提交任務,并且輸出部分信息。內部的幾個子類用途可以參看流程中提到的角色職責。具體地看看幾個類和方法的代碼片斷:

LogAnalysiser::MapClass

    
      ??? public static class MapClass extends MapReduceBase
??? ??? implements Mapper<LongWritable, Text, Text, LongWritable> 
??? {
??? ??? public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)
??? ??? ??? ??? throws IOException
??? ??? {??? 
??? ??? ??? String line = value.toString();//沒有配置RecordReader,所以默認采用line的實現,key就是行號,value就是行內容
??? ??? ??? if (line == null || line.equals(""))
??? ??? ??? ??? return;
??? ??? ??? String[] words = line.split(",");
??? ??? ??? if (words == null || words.length < 8)
??? ??? ??? ??? return;
??? ??? ??? String appid = words[1];
??? ??? ??? String apiName = words[2];
??? ??? ??? LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));
??? ??? ??? Text record = new Text();
??? ??? ??? record.set(new StringBuffer("flow::").append(appid)
??? ??? ??? ??? ??? ??? ??? .append("::").append(apiName).toString());
??? ??? ??? reporter.progress();
??? ??? ??? output.collect(record, recbytes);//輸出流量的統計結果,通過flow::作為前綴來標示。
??? ??? ??? record.clear();
??? ??? ??? record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());
??? ??? ??? output.collect(record, new LongWritable(1));//輸出次數的統計結果,通過count::作為前綴來標示
??? ??? }??? 
??? }


    
  

LogAnalysiser:: PartitionerClass

    
      ??? public static class PartitionerClass implements Partitioner<Text, LongWritable>
??? {
??? ??? public int getPartition(Text key, LongWritable value, int numPartitions)
??? ??? {
??? ??? ??? if (numPartitions >= 2)//Reduce 個數,判斷流量還是次數的統計分配到不同的Reduce
??? ??? ??? ??? if (key.toString().startsWith("flow::"))
??? ??? ??? ??? ??? return 0;
??? ??? ??? ??? else
??? ??? ??? ??? ??? return 1;
??? ??? ??? else
??? ??? ??? ??? return 0;
??? ??? }
??? ??? public void configure(JobConf job){}??? 
}
    
  

LogAnalysiser:: CombinerClass

參看ReduceClass,通常兩者可以使用一個,不過這里有些不同的處理就分成了兩個。在ReduceClass中藍色的行表示在CombinerClass中不存在。

LogAnalysiser:: ReduceClass

    
      ??? public static class ReduceClass extends MapReduceBase
??? ??? implements Reducer<Text, LongWritable,Text, LongWritable> 
??? {
??? ??? public void reduce(Text key, Iterator<LongWritable> values,
??? ??? ??? ??? OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException
??? ??? {
??? ??? ??? Text newkey = new Text();
??? ??? ??? newkey.set(key.toString().substring(key.toString().indexOf("::")+2));
??? ??? ??? LongWritable result = new LongWritable();
??? ??? ??? long tmp = 0;
??? ??? ??? int counter = 0;
??? ??? ??? while(values.hasNext())//累加同一個key的統計結果
??? ??? ??? {
??? ??? ??? ??? tmp = tmp + values.next().get();
??? ??? ??? ??? 
??? ??? ??? ??? counter = counter +1;//擔心處理太久,JobTracker長時間沒有收到報告會認為TaskTracker已經失效,因此定時報告一下
??? ??? ??? ??? if (counter == 1000)
??? ??? ??? ??? {
??? ??? ??? ??? ??? counter = 0;
??? ??? ??? ??? ??? reporter.progress();
??? ??? ??? ??? }
??? ??? ??? }
??? ??? ??? result.set(tmp);
??? ??? ??? output.collect(newkey, result);//輸出最后的匯總結果
??? ??? }??? 
??? }
    
  

LogAnalysiser

    
      	public static void main(String[] args)
	{
		try
		{
			run(args);
		} catch (Exception e)
		{
			e.printStackTrace();
		}
	}
	public static void run(String[] args) throws Exception
	{
		if (args == null || args.length <2)
		{
			System.out.println("need inputpath and outputpath");
			return;
		}
		String inputpath = args[0];
		String outputpath = args[1];
		String shortin = args[0];
		String shortout = args[1];
		if (shortin.indexOf(File.separator) >= 0)
			shortin = shortin.substring(shortin.lastIndexOf(File.separator));
		if (shortout.indexOf(File.separator) >= 0)
			shortout = shortout.substring(shortout.lastIndexOf(File.separator));
		SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");
		shortout = new StringBuffer(shortout).append("-")
			.append(formater.format(new Date())).toString();
		
		
		if (!shortin.startsWith("/"))
			shortin = "/" + shortin;
		if (!shortout.startsWith("/"))
			shortout = "/" + shortout;
		shortin = "/user/root" + shortin;
		shortout = "/user/root" + shortout;			
		File inputdir = new File(inputpath);
		File outputdir = new File(outputpath);
		if (!inputdir.exists() || !inputdir.isDirectory())
		{
			System.out.println("inputpath not exist or isn't dir!");
			return;
		}
		if (!outputdir.exists())
		{
			new File(outputpath).mkdirs();
		}
		
		JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//構建Config
		FileSystem fileSys = FileSystem.get(conf);
		fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//將本地文件系統的文件拷貝到HDFS中

		conf.setJobName("analysisjob");
		conf.setOutputKeyClass(Text.class);//輸出的key類型,在OutputFormat會檢查
		conf.setOutputValueClass(LongWritable.class); //輸出的value類型,在OutputFormat會檢查
		conf.setMapperClass(MapClass.class);
		conf.setCombinerClass(CombinerClass.class);
		conf.setReducerClass(ReduceClass.class);
		conf.setPartitionerClass(PartitionerClass.class);
		conf.set("mapred.reduce.tasks", "2");//強制需要有兩個Reduce來分別處理流量和次數的統計
		FileInputFormat.setInputPaths(conf, shortin);//hdfs中的輸入路徑
		FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中輸出路徑
		
		Date startTime = new Date();
	    	System.out.println("Job started: " + startTime);
	    	JobClient.runJob(conf);
	    	Date end_time = new Date();
	    	System.out.println("Job ended: " + end_time);
	    	System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
	    	//刪除輸入和輸出的臨時文件
		fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));
		fileSys.delete(new Path(shortin),true);
		fileSys.delete(new Path(shortout),true);
	}


    
  

以上的代碼就完成了所有的邏輯性代碼,然后還需要一個注冊驅動類來注冊業務Class為一個可標示的命令,讓hadoop jar可以執行。

    
      public class ExampleDriver {
? public static void main(String argv[]){
??? ProgramDriver pgd = new ProgramDriver();
??? try {
????? pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");
????? pgd.driver(argv);
??? }
??? catch(Throwable e){
????? e.printStackTrace();
??? }
? }
}
    
  

將代碼打成jar,并且設置jar的mainClass為ExampleDriver這個類。在分布式環境啟動以后執行如下語句:

    
      hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out
    
  

在/home/wenchu/test-in中是需要分析的日志文件,執行后就會看見整個執行過程,包括了Map和Reduce的進度。執行完畢會 在/home/wenchu/test-out下看到輸出的內容。有兩個文件:part-00000和part-00001分別記錄了統計后的結果。 如果需要看執行的具體情況,可以看在輸出目錄下的_logs/history/xxxx_analysisjob,里面羅列了所有的Map,Reduce 的創建情況以及執行情況。在運行期也可以通過瀏覽器來查看Map,Reduce的情況:http://MasterIP:50030 /jobtracker.jsp

Hadoop集群測試

首先這里使用上面的范例作為測試,也沒有做太多的優化配置,這個測試結果只是為了看看集群的效果,以及一些參數配置的影響。

文件復制數為1,blocksize 5M

Slave數 處理記錄數(萬條) 執行時間(秒)
2 95 38
2 950 337
4 95 24
4 950 178
6 95 21
6 950 114

Blocksize 5M

Slave數 處理記錄數(萬條) 執行時間(秒)
2(文件復制數為1) 950 337
2(文件復制數為3) 950 339
6(文件復制數為1) 950 114
6(文件復制數為3) 950 117

文件復制數為1

Slave數 處理記錄數(萬條) 執行時間(秒)
6(blocksize 5M) 95 21
6(blocksize 77M) 95 26
4(blocksize 5M) 950 178
4(blocksize 50M) 950 54
6(blocksize 5M) 950 114
6(blocksize 50M) 950 44
6(blocksize 77M) 950 74

測試的數據結果很穩定,基本測幾次同樣條件下都是一樣。通過測試結果可以看出以下幾點:

  1. 機器數對于性能還是有幫助的(等于沒說^_^)。
  2. 文件復制數的增加只對安全性有幫助,但是對于性能沒有太多幫助。而且現在采取的是將操作系統文件拷貝到HDFS中,所以備份多了,準備的時間很長。
  3. blocksize對于性能影響很大,首先如果將block劃分的太小,那么將會增加job的數量,同時也增加了協作的代價,降低了性能,但是配置的太大也會讓job不能最大化并行處理。所以這個值的配置需要根據數據處理的量來考慮。
  4. 最后就是除了這個表里面列出來的結果,應該去仔細看輸出目錄中的_logs/history中的xxx_analysisjob這個文件,里面記錄了全部的執行過程以及讀寫情況。這個可以更加清楚地了解哪里可能會更加耗時。

隨想

“云計算”熱的燙手,就和SAAS、Web2及SNS等一樣,往往都是在搞概念,只有真正踏踏實實的大型互聯網公司,才會投入人力物力去研究符合自 己的分布式計算。其實當你的數據量沒有那么大的時候,這種分布式計算也就僅僅只是一個玩具而已,只有在真正解決問題的過程中,它深層次的問題才會被挖掘出 來。

這三篇文章(分布式計算開源框架Hadoop介紹,Hadoop中的集群配置和使用技巧)僅僅是為了給對分布式計算有興趣的朋友拋個磚,要想真的掘到金 子,那么就踏踏實實的去用、去想、去分析?;蛘咦约阂矔M一步地去研究框架中的實現機制,在解決自己問題的同時,也能夠貢獻一些什么。

前幾日看到有人跪求成為架構師的方式,看了有些可悲,有些可笑,其實有多少架構師知道什么叫做架構?架構師的職責是什么?與其追求這么一個名號,還不如踏踏實實地做塊石頭沉到水底。要知道,積累和沉淀的過程就是一種成長。

分布式計算開源框架Hadoop入門實踐(三)


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦?。?!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 黄黄网| 在线欧美视频免费观看国产 | 亚洲 国产 图片 | 日日夜夜嗷嗷叫 | 2019精品国产品免费观看 | 亚洲精品mv在线观看 | 国产九九精品 | 欧美特黄级乱色毛片 | www.国产福利视频.com | 欧美在线成人免费国产 | 韩日精品在线 | 国产小视频国产精品 | 二区国产| 美女天天操 | 老司机福利在线播放 | 国产免费无遮挡精品视频 | 亚洲综合视频 | 久操视频在线播放 | 日本xxxwww在线观看免费 | 成在线人永久免费播放视频 | 亚洲欧美日韩国产色另类 | 国产精品探花一区在线观看 | 国产亚洲天堂 | 国产日韩欧美亚洲综合在线 | 福利视频中文在线观看 | 亚洲欧美色综合精品 | 激情网址大全 | 亚洲美女视频网址 | 狠狠狠色丁香婷婷综合久久五月 | 亚洲va欧美va人人爽夜夜嗨 | 久青草网站 | 69色视频日韩在线视频 | 日本一级毛片不卡免费 | 成人性色生活片免费看爆迷你毛片 | 日本精品久久久中文字幕 | 中文国产成人精品久久96 | 亚洲三级在线免费观看 | 国产精品一级 | 日本免费一区二区三区 | 高清影院|精品秒播3 | 久久99国产精品久久 |