import random
# class Mapper可参考上一篇博客
class PercentSampleMapper(Mapper):
def __init__(self,*args,**kwargs):
self.percentage = kwargs.pop("percentage")
super(PercentSampleMapper,self).__init__(*args,**kwargs)
def map(self):
for _, val in self:
if random.random() < self.percentage:
self.emit(None,val)
if __name__=='__main__':
mapper = PercentSampleMapper(sys.stdin,percentage=0.2)
mapper.map
接着,使用一个恒等reducer合并数据
2.随机抽取固定容量样本:
仍然对每条数据产生一个[0,1]上的均匀分布,然后通过堆维护一个容量为n的样本。
import random,heapd
class SampleMapper(Mapper):
def __init__(self,n,*args,**kwargs):
self.n = n
super(SampleMapper,self).__init__(*args,**kwargs)
def map(self):
heap = [0]*self.n
for value in self:
#heappushpop 如存在random.random()>heap中的某值,则将heap中最小值弹出,并将random.random()随机插入到heap
heapd.heappushpop(heap,(random.random(),value))
for item in heap:
self.emit(None,item)
class SampleReducer(Mapper):
def __init__(self,n,*args,**kwargs):
self.n = n
supper(SampleReducer,self).__init__(*args,**kwargs)
def reduce(self):
heap = [0]*self.n
for _,value in self:
for value in values:
heapd.heappushpop(heap,make_tuple(value))
for item in heap:
self.emit(None,item[1])