⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 netflickcanopydatamapper.java

📁 一个简单的mapreduce实现
💻 JAVA
字号:
//// Author - Jack Hebert (jhebert@cs.washington.edu)// Copyright 2007// Distributed under GPLv3//import java.io.IOException;import java.util.*;import java.lang.StringBuilder;import org.apache.hadoop.io.*;import org.apache.hadoop.fs.*;		import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.JobConf;public class NetflickCanopyDataMapper extends MapReduceBase implements Mapper {	private static ArrayList<NetflixMovie> capopyCenters = new ArrayList<NetflixMovie>();	private boolean done = false;	private int count = 0;		// Load the canopy centers into memory.	public void configure(JobConf conf) {			     try {	    	 if(done)	    		 return;	    	 else	    		 done = true;             FileSystem fs = FileSystem.get(conf);             Path path = new Path("/user/jhebert/out2/part-00000");             SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);             Text key = new Text();             Text value = new Text();             while(true) {            	 reader.next(key, value);            	 if(key.toString().equals(""))            		 break;     			 NetflixMovie curr = new NetflixMovie(key.toString(), value.toString());     			 capopyCenters.add(curr);     			 key.set("");             }     } catch (IOException e) {              e.printStackTrace();     }	}		// This needs to read through the netflix data and for each point if it belongs	// to a canopy, then emit it to it.	// Final form will be CanopyID1:CanopyID2:... MovieID:movieVector	// where movieVector contains the userID,rating pairs.	public void map(WritableComparable key, Writable values,			OutputCollector output, Reporter reporter) throws IOException {		count += 1;		String movie_id = ((Text)key).toString();		String data = ((Text)values).toString();		String status = count+":"+capopyCenters.size()+":"+movie_id;		reporter.setStatus(status);		NetflixMovie curr = new NetflixMovie(movie_id, data);		boolean emitted = false;		StringBuilder builder = new StringBuilder();		for(NetflixMovie center: capopyCenters) {			int matchCount = curr.MatchCount(center, 2);			if(matchCount > 2) {				emitted |= true;				if(builder.length()>0)					builder.append(":");				builder.append(center.movie_id);			}		}		if(emitted) {			builder.append(":");			builder.append(movie_id);			builder.append(":");			builder.append(data);			String to_emit = builder.toString();			output.collect(new Text(movie_id), new Text(to_emit));		}else			reporter.setStatus("Did not emit: "+movie_id);	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -