博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python使用hbase
阅读量:6276 次
发布时间:2019-06-22

本文共 2390 字,大约阅读时间需要 7 分钟。

#coding:utf-8__author__ = 'similarface'from multiprocessing import Processimport happybaseimport osimport reimport hashlibimport multiprocessingfrom multiprocessing import Queuebasedir="/tmp/t8"filterpath="/Users/similarface/Documents/20170303Morgene999ProductFullSNP.txt"snpkey={}pattern_barcode= re.compile(r'[0-9]{3}[-][0-9]{4}[-][0-9]{4}')pattern_ls=re.compile(r'\s+')def func(filepath,snpkey):    conn=happybase.Connection(host='192.168.30.250')    table=conn.table('chipdata')    barcodes=pattern_barcode.findall(filepath)    barcode=barcodes[0]    i=0    all=0    with open(filepath,'rb') as foper:        for line in foper:            try:                lines=pattern_ls.split(line.strip())                chr=lines[1]                pos=lines[2]                key=chr+":"+pos                #print key                if key in snpkey:                    all=all+1                    m = hashlib.md5()                    m.update(pos.strip())                    rowkey = m.hexdigest()+":"+chr.upper()                    dictkey='d:'+barcode                    columns=[dictkey]                    rows_as_dict = dict(table.row(rowkey,columns))                    if rows_as_dict[dictkey]==lines[3]:                        i=i+1            except Exception,e:                pass    print barcode+":"+format((i+0.0)/all,'0.1%')+"match"+str(i)        #q.put(barcode+":"+format((i+0.0)/all,'0.1%'))    conn.close()def read(q):    while True:        value = q.get(True)        print 'Get %s from queue.' % valueif __name__ == "__main__":    pool = multiprocessing.Pool(processes = 3)    snpkey={}    q = Queue()    pattern_s=re.compile(r'\s+')    with open(filterpath,'rb') as oper:        for line in oper:            if line.strip()!="":                lines=pattern_s.split(line.strip())                snpkey[':'.join(lines[0:2])]=""    # pr = Process(target=read, args=(q,))    # pr.start()    for filename in os.listdir(basedir):        if filename.endswith("snp"):            filterpath=os.path.join(basedir,filename)            pool.apply_async(func, args=(filterpath,snpkey))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"    pool.close()    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束    print "Sub-process(es) done."    #pr.terminate()

 

转载于:https://www.cnblogs.com/similarface/p/7053282.html

你可能感兴趣的文章
Swift-EasingAnimation
查看>>
[翻译] BKZoomView
查看>>
C++类设计的一些心得
查看>>
tableVIew删除时的delete按钮被挡住时重写的方法
查看>>
读cookie中文字符乱码问题
查看>>
招募译者翻译并发数据结构
查看>>
普通表转换为分区表
查看>>
Java 容器 & 泛型:三、HashSet,TreeSet 和 LinkedHashSet比较
查看>>
性能优化总结(六):预加载、聚合SQL应用实例
查看>>
http缓存知识
查看>>
Go 时间交并集小工具
查看>>
iOS 多线程总结
查看>>
webpack是如何实现前端模块化的
查看>>
TCP的三次握手四次挥手
查看>>
关于redis的几件小事(六)redis的持久化
查看>>
package.json
查看>>
webpack4+babel7+eslint+editorconfig+react-hot-loader 搭建react开发环境
查看>>
Maven 插件
查看>>
初探Angular6.x---进入用户编辑模块
查看>>
计算机基础知识复习
查看>>