当前位置: 代码迷 >> 综合 >> jython executor
  详细解决方案

jython executor

热度:31   发布时间:2023-12-21 16:57:34.0

jython executor

# -*- coding: utf-8 -*-
try:from webx import settings
except ImportError:import os,sysp = os.path.dirname(__file__)for x in xrange(5):p=os.path.split(p)[0]os.sys.path.append(p+'/src/main/webapp')"""
由于需要依赖 java concurrent框架, 所以只能通过jython启动
"""from java.util.concurrent import  ThreadFactory,Executors,ThreadPoolExecutor,TimeUnit,ArrayBlockingQueue
from java.lang import Thread,Runnable
from  java.util.concurrent.atomic import AtomicIntegerfrom task.index.solr import Solr      
from people.service import userQueryService
from blog.service import    blogQueryService
from tag.service import tagQueryService
from shopping.service import shoppingItemServiceimport logging
logger = logging.getLogger("opener")'''
负责创建Solr Client
'''
class SolrFactory():def __init__(self,server,timeout):self.cores = ['people','feed','feed2','album']self.solrs = {}for  core in self.cores:logger.info("init solr client "+core)self.solrs[core] = Solr(server + core + "/", timeout)def get_solr(self,core):return self.solrs.get(core)write = 'http://192.168.173.6:7100/solr/'
solrFactory = SolrFactory(write,20)'''
为 Executors中的线程增加名字,便于jstack的时候查看Thread
参考:java concurrent包的  Executors.DefaultThreadFactory
'''
class NamedThreadFactory(ThreadFactory): def __init__(self,prefix,daemo=False):self.mPrefix = prefix + "-thread-";self.mDaemo = daemo;self.mGroup = Thread.currentThread().getThreadGroup() self.mThreadNum = AtomicInteger(1);def newThread(self, runnable):name = '%s%s ' %(self.mPrefix , self.mThreadNum.getAndIncrement())ret = Thread(self.mGroup, runnable, name, 0)ret.setDaemon(self.mDaemo)return ret;rejectedExecutionHandler = ThreadPoolExecutor.CallerRunsPolicy()    #使用CallerRuns策略,变相的减缓新任务的提交速度
buildExecutor =  ThreadPoolExecutor(5,5, 0L, TimeUnit.MILLISECONDS,ArrayBlockingQueue(1000), NamedThreadFactory("build"),rejectedExecutionHandler ); 
commitExecutor =  ThreadPoolExecutor(5,5, 0L, TimeUnit.MILLISECONDS,ArrayBlockingQueue(1000), NamedThreadFactory("commit"),rejectedExecutionHandler );'''
负责把模型build成solr document,分别有:
BlogBuilder
PeopleBuilder
SolrBuilder
'''                                                                      
class BaseBuilder(Runnable):def _solr_str(self,date):"""1995-12-31T23:59:59Z"""return date.strftime("%Y-%m-%dT%H:%M:%SZ")def run(self):raise NotImplementedErrorclass  BlogBuilder(BaseBuilder):def __init__(self, ids,executor):self.ids = idsself.executor = executorself.solr = solrFactory.get_solr("feed") def _extra_tags(self,blog):iphone_wall_paper = blog.photo_width>=320 and blog.photo_height>=480 and blog.photo_height/float(blog.photo_width) == 1.5 and "INNERTAGIPHWP" or ""iphone_wall_paper = blog.photo_width>=320 and blog.photo_height>=568 and blog.photo_height/float(blog.photo_width) == 1.775 and "INNERTAGIPHWP5" or iphone_wall_paperitem_cat_id = ''item_cat_pid = ''
#        if blog.is_buyable():
#            item_id = shoppingItemService.find_item_id(blog)
#            item_cat = shoppingItemService.find_or_create_taobao_category(None,item_id)
#            if item_cat:
#                item_cat_id =  item_cat.get("cid") and 'ITEMCATID_%s'%item_cat.get("cid") 
#                item_cat_pid = item_cat.get("parent_id")  and 'ITEMCATPID_%s'%item_cat.get("parent_id")
#            else:
#                logger.error('Can not include taobao category info when indexing %s'%blog.id)return (iphone_wall_paper, item_cat_id, item_cat_pid)def run(self):#print 'build %s'%len(self.ids)blog_list = blogQueryService.queryBlogDetail(self.ids)doc = []for blog in blog_list:if not blog.is_normal():continueif blog.is_in_audit():continueblog_tags = tagQueryService.queryTagStrByBlogId(blog.id)extra_tags =  self._extra_tags(blog) #这里可以扩展内部的其他各种查询条件, 比增加字段靠谱, 性能更好, 更灵活data =  {"id": blog.id,"sender_id": blog.sender_id,"msg": blog.msg or "" +" " +"".join(blog_tags)+" " +"".join(extra_tags),"buyable": blog.buyable,"add_datetime": self._solr_str(blog.add_datetime),"last_replied_datetime":self._solr_str(blog.last_replied_datetime),"photo_id":blog.photo_id,"photo_width": blog.photo_width,"photo_height": blog.photo_height,"favorite_count":blog.favorite_count,"category":blog.category,"status":blog.status,"group_id": blog.group_id,"album_id":blog.album_id,"pop_score":1, #已经废弃"shopping_score":1,#已经废弃"src_domain":blog.source_link,"price":blog.price,}doc.append(data)self.executor.execute(Commiter(doc,self.solr)) #提交taskclass Commiter(Runnable):def __init__(self,doc,solr):self.doc = docself.solr = solrdef run(self):#print 'commit %s'%len(self.doc)if self.doc:try:self.solr.add_many(self.doc)self.solr.commit()except Exception,e:logger.exception(e) if __name__ == "__main__":buildExecutor.execute(BlogBuilder([1,2,3,4,5,6,7,8,9,10,11],commitExecutor))import timewhile True:time.sleep(2)

 

  相关解决方案