MsgSender.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. #!/usr/bin/python
  2. #coding:utf-8
  3. import sys
  4. import os
  5. import time
  6. import logging
  7. import logging.config
  8. import ConfigParser
  9. import cx_Oracle
  10. import Demo_sms
  11. from apscheduler.schedulers.blocking import BlockingScheduler
  12. os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
  13. #os.environ['NLS_LANG'] = 'AMERICAN_AMERICA.ZHS16GBK'
  14. def MsgSender(db_url, defult_recever):
  15. def getNewMsg():
  16. '''
  17. 查询新增的预警消息
  18. '''
  19. v_sql='''select alarm_id,to_char(createtime,'yyyy-mm-dd hh24:mi:ss') as createtime,alarm_type,alarm_level,alarm_text,send_state from zsjk_alarm where createtime>=sysdate-1/24 and send_state in (0,-1) and is_project=0'''
  20. MsgSet=set()
  21. try:
  22. logger.info('getNewMsg exec sql = %s' , v_sql)
  23. cur=con.cursor()
  24. cur.execute(v_sql)
  25. res=cur.fetchall()
  26. logger.info('getNewMsg data number is %d' , len(res))
  27. for MsgList in res:
  28. logger.info('getNewMsg is : %s' , str(MsgList))
  29. MsgSet.add(MsgList)
  30. except Exception:
  31. logger.error('getNewMsg faild ', exc_info=True)
  32. finally:
  33. cur.close()
  34. return MsgSet
  35. def getProject(createtime,alarm_type):
  36. '''
  37. 判断是否为工程状态
  38. '''
  39. v_sql='''select project_id,createtime,create_user,status,starttime,endtime,project_ne from zsjk_project p where p.status=1 and project_ne=:x1 and p.starttime <=to_date(:x2,'yyyy-mm-dd hh24:mi:ss') and p.endtime>=to_date(:x2,'yyyy-mm-dd hh24:mi:ss') and rownum=1'''
  40. try:
  41. logger.info('getProject exec sql = %s [ %s,%s ]' , v_sql,alarm_type,createtime)
  42. cur=con.cursor()
  43. cur.execute(v_sql,x1=alarm_type,x2=createtime)
  44. res=cur.fetchall()
  45. #logger.info('getProject data number is %d' , len(res))
  46. if len(res)>=1:
  47. logger.info('getProject is : %s' , str(MsgList))
  48. return 1
  49. else:
  50. logger.info('not get Project')
  51. return 0
  52. except Exception:
  53. logger.error('getNewMsg faild ', exc_info=True)
  54. return -1
  55. finally:
  56. cur.close()
  57. def updateMsgProject(alarm_id):
  58. '''
  59. 更新告警为工程状态,0非工程、1工程
  60. '''
  61. v_sql='''update zsjk_alarm set is_project=1 where alarm_id=:x1'''
  62. try:
  63. logger.info('updateMsgProject exec sql = %s [ %d ]' , v_sql,alarm_id)
  64. cur=con.cursor()
  65. cur.execute(v_sql,x1=alarm_id)
  66. con.commit()
  67. except Exception:
  68. logger.error('updateMsgProject faild ', exc_info=True)
  69. finally:
  70. cur.close()
  71. def getUsers(alarm_type):
  72. '''
  73. 获取需要发送短信的人员信息
  74. '''
  75. v_sql="select user_id,user_name,phone_number,mobile_from,user_info,role_name,alarm_type from zsjk_user_v where ','||alarm_type||',' like '%,'||:x1||',%'"
  76. userSet=set()
  77. try:
  78. logger.info('getUsers exec sql = %s [ %s ]' , v_sql,alarm_type)
  79. cur=con.cursor()
  80. cur.execute(v_sql,x1=alarm_type)
  81. res=cur.fetchall()
  82. logger.info('getUsers data number is %d' , len(res))
  83. for UserList in res:
  84. #v_user_name=UserList[1]
  85. v_phone_number=UserList[2]
  86. v_mobile_from=UserList[3]
  87. #logger.info('getUsers is : %s' , str(UserList))
  88. v_cmcc_mobile='移动号段'
  89. if v_mobile_from!=v_cmcc_mobile:
  90. logger.info('this phone_number %d is %s , not need send .' , v_phone_number,v_mobile_from)
  91. else:
  92. logger.info('this phone_number %d is %s , need send .' , v_phone_number,v_mobile_from)
  93. userSet.add(v_phone_number)
  94. except Exception:
  95. logger.error('getNewMsg faild ', exc_info=True)
  96. finally:
  97. cur.close()
  98. return userSet
  99. def updateMsgSendState(alarm_id,send_state):
  100. '''
  101. #更新发送状态:0未发送、1已发送、-1发送失败、-2未配置发送对象
  102. '''
  103. v_sql='''update zsjk_alarm set send_state=:x1,send_time=sysdate where alarm_id=:x2'''
  104. try:
  105. logger.info('updateMsgSendState exec sql = %s [ %d,%d ]' , v_sql,send_state,alarm_id)
  106. cur=con.cursor()
  107. cur.execute(v_sql,x1=send_state,x2=alarm_id)
  108. con.commit()
  109. except Exception:
  110. logger.error('updateMsgSendState faild ', exc_info=True)
  111. finally:
  112. cur.close()
  113. def getDefultRecever():
  114. '''
  115. '''
  116. startTime=time.time()
  117. logger.info('task start ... ')
  118. MsgSet=set()
  119. v_userSet=set()
  120. v_is_project=0
  121. usersStr=''
  122. try:
  123. con=cx_Oracle.connect(db_url)
  124. MsgSet=getNewMsg()
  125. if len(MsgSet)>0:
  126. for MsgList in MsgSet:
  127. v_alarm_id=MsgList[0]
  128. v_createtime=MsgList[1]
  129. v_alarm_type=MsgList[2]
  130. v_alarm_text=MsgList[4].decode('utf-8')
  131. v_is_project=getProject(v_createtime,v_alarm_type)
  132. if v_is_project==1:
  133. logger.info('it is project, not need send msg.')
  134. updateMsgProject(v_alarm_id)
  135. elif v_is_project==0:
  136. logger.info('it is not project.')
  137. v_userSet=getUsers(v_alarm_type)
  138. if len(v_userSet)>0:
  139. i=0
  140. for user in v_userSet:
  141. if i==0:
  142. usersStr=str(user)
  143. else:
  144. usersStr=usersStr+','+str(user)
  145. i=i+1
  146. logger.info('get user to need send msg is : %s.',usersStr)
  147. #print v_alarm_text
  148. res=Demo_sms.sendSms(usersStr,v_alarm_text)
  149. if res:
  150. logger.debug('send sms to %s sucess',usersStr)
  151. updateMsgSendState(v_alarm_id,1)
  152. else:
  153. logger.debug('send sms failed.')
  154. updateMsgSendState(v_alarm_id,-1)
  155. else:
  156. logger.info('not get user to need send msg.')
  157. usersStr='13730885681'
  158. v_alarm_text='no recever '+ MsgList[4].decode('utf-8')
  159. res=Demo_sms.sendSms(usersStr, v_alarm_text)
  160. logger.debug('send sms to %s',usersStr)
  161. updateMsgSendState(v_alarm_id,-2)
  162. except Exception:
  163. logger.error('connect db faild : ' , exc_info=True)
  164. return
  165. finally:
  166. con.close()
  167. endTime=time.time()
  168. timeCost= round(endTime - startTime ,2)
  169. logger.info('task finished , used time %d s' , timeCost )
  170. if ( __name__ == "__main__"):
  171. logging.config.fileConfig("logging.conf")
  172. #create logger
  173. logger = logging.getLogger("MsgSender")
  174. cfgfile='conf/MsgSender.conf'
  175. try:
  176. cf = ConfigParser.SafeConfigParser()
  177. cf.read(cfgfile)
  178. db_url = cf.get('db_info', 'db_url')
  179. defult_recever = cf.get('sms', 'defult_recever')
  180. except Exception:
  181. logger.error('parse cfg file %s failed : ' ,cfgfile, exc_info=True)
  182. sys.exit(-1)
  183. try:
  184. scheduler = BlockingScheduler()
  185. scheduler.add_job(MsgSender, 'cron', args=(db_url, defult_recever), second='0/30')
  186. while True:
  187. scheduler.start() #采用的是阻塞的方式,只有一个线程专职做调度的任务
  188. except (KeyboardInterrupt, SystemExit):
  189. scheduler.shutdown()