缘由
由于我个人有个小博客,然后还录制了一些课程,所有就建立了对应的微信群,但是微信群的二维码是变化的,总不能经常换了。所以就想搞一个微信机器人,自动加 微信,自动拉群。废话不多说,先上图
效果展示
大家可以试试效果,效果相当棒
需求
其他人可以通过二维码加我好友 自动通过
好友通过之后 主动发送一些邀请入群链接和介绍信
回复 邀请 、加群 可以发送邀请入群链接
所有聊天数据都要存储起来 并且可以通过Web展示
根据用户输入信息 回复相关内容
技术选型
Python3
Flask:轻量级的MVC 框架
itchat :实现微信API相关接口
MySQL:存储相关微信内容
图灵机器人:调用API实现主动场景回复
架构
本人没有使用flask 默认的MVC方式,使用了我自己实现过后的:结构分层明了的MVC框架 。结构如下
每一个文件夹都有自己明确的作用
├── application.py ├── common │ ├── libs │ └── models ├── config │ ├── base_setting.py │ ├── local_setting.py │ ├── production_setting.py ├── controllers │ ├── index.py │ ├── member.py ├── interceptors │ ├── Auth.py │ ├── errorHandler.py ├── jobs │ ├── launcher.py │ ├── movie.py │ └── tasks ├── manager.py ├── requirement.txt ├── static │ ├── js │ └── plugins ├── templates │ ├── common │ ├── index.html │ ├── info.html │ └── member ├── test │ └── apsch_test.py ├── tornado_server.py └── www.py
实现源码
这里将主要实现源码如下
# -*- coding: utf-8 -*- from application import app,db import itchat,signal,json,time,os,random import multiprocessing from ctypes import c_bool, c_int from common.libs.tuling123.TuLingService import TuLingService from common.libs.wxbot.WeChatBotService import WeChatBotService from common.models.queue import QueueList from common.libs.DateUtil import getCurrentTime import traceback,requests ''' python manage.py runjob -m wechatbot/index ''' class JobTask(): def __init__(self): pass def run(self, params): try: # 增加重试连接次数 # requests使用了urllib3库,默认的http connection是keep-alive的,requests设置False关闭。 s = requests.session() s.keep_alive = False itchat.auto_login( enableCmdQR=2,hotReload=True ) threads = [] t_queue_consumer = multiprocessing.Process(target = syncConsumer, args=("t_queue_consumer",) ) t_queue_consumer.daemon = True threads.append( t_queue_consumer ) app.logger.info( 'Parent process {0} is Running'.format( os.getpid() ) ) for i in range( len(threads) ): threads[i].start() #初始化所有的朋友,存入队列,然后进程做事情 queue_msg = { 'type': 'friends', 'data': itchat.get_friends( update=True ) } global_queue.put(queue_msg) ##必须要把群聊保存到通讯录才行,不会无法获取到的 #https://github.com/littlecodersh/ItChat/issues/480 queue_msg = { 'type': 'group', 'data': itchat.get_chatrooms(update=True) } global_queue.put(queue_msg) itchat.run() except Exception as err: app.logger.error( "=========itchat:run error========" ) traceback.print_exc() exit( 15 ) @staticmethod def exit( signum, frame ): global sendmail_flag app.logger.info( "手动退出~~" ) app.logger.info( "signum:%s,frame:%s,sendmail_flag:%s"%( signum,frame,sendmail_flag ) ) if not sendmail_flag: sendmail_flag = True #itchat.logout()#执行这个退出之后下次就必须要在扫码才能登陆 from common.libs.queue.QueueListService import QueueListService notice_users = QueueListService.getNoticeUsers(ids=[app.config['NOTICE_WECHAT_USER']['family_host_man']]) data = { "msg": "微信机器人 倒了,需要人为干预,修复好~~", "title": "【提醒】微信机器人 倒了", "notice_users": notice_users } QueueListService.addQueueMsg(queue_name="reminder", data=data, type=2) exit( 15 ) # http://itchat.readthedocs.io/zh/latest/ # 加入图灵机器人:https://www.kancloud.cn/turing/web_api/522992 # 关于进程的文章:https://mozillazg.com/2017/07/python-how-to-generate-kill-clean-zombie-process.html @itchat.msg_register(itchat.content.INCOME_MSG) def handle_msg(msg): queue_msg = { 'type':'msg', 'data':msg } global_queue.put( queue_msg ) if msg.Type == itchat.content.TEXT and msg.FromUserName != "newsapp": if msg.Type == itchat.content.TEXT and ( "邀请" in msg.Text or "加群" in msg.Text ) : # 同时发送加群邀请链接 itchat.add_member_into_chatroom( get_group_id("编程浪子小天地"), [{'UserName':msg.FromUserName }], useInvitation=True ) return None ''' 需要过滤掉几种情况,例如自己发送的不要自己回复自己了 ''' tmp_msg = [] tmp_msg.append( TuLingService.getRobotAnswer( msg.Text ) ) tmp_msg.extend( getAd() ) msg.user.send( "\n".join( tmp_msg ) ) elif msg.Type == itchat.content.FRIENDS: msg.user.verify() tmp_msg = [ "欢迎进入编程浪子机器人世界~~", "你可以输入想对我说的话,会有彩蛋", "个人博客:http://www.54php.cn", "回复 邀请 、 加群 关键字 可以加入群", "获取慕课课程账号,请关注公众号" ] tmp_msg.extend( getAd( is_rand = False ) ) msg.user.send( "\n".join( tmp_msg ) ) itchat.send_image( app.root_path + "/web/static/images/wechat/coderonin.jpg" ,msg['RecommendInfo']['UserName'] ) itchat.send_image( app.root_path + "/web/static/images/wechat/mina.jpg" ,msg['RecommendInfo']['UserName'] ) #同时发送加群邀请链接 itchat.add_member_into_chatroom( get_group_id("编程浪子小天地"), [{'UserName': msg['RecommendInfo']['UserName'] }], useInvitation=True) elif msg.Type in [ itchat.content.SYSTEM, itchat.content.NOTE ]: #note表示添加好友通过后的备注内容 pass else: if msg.Type == "Sharing" and msg.Text == "邀请你加入群聊": return None tmp_msg = [ "我还是个孩子~~无法识别你的发的什么哦" ] tmp_msg.extend(getAd()) msg.user.send( "\n".join( tmp_msg ) ) return None @itchat.msg_register(itchat.content.INCOME_MSG, isGroupChat=True) def handle_group_msg(msg): app.logger.info("type:{0},text:{1}".format(msg.Type, msg.Text)) app.logger.info(msg) return None def getAd( is_rand = True ): ad_urls = [ "http://t.cn/AiK3JhDK", "http://t.cn/AiK3JLD5", "http://t.cn/AiK3JqKX", "http://t.cn/AiK3JfqQ", "http://t.cn/AiK3JXWa", "http://t.cn/AiK3JNvY", "http://t.cn/AiKS9XZR", "http://t.cn/AiK3JQe3", "http://t.cn/AiK3JuJi", "http://t.cn/AiK3JeOC", "http://t.cn/AiK3ivyk", "http://t.cn/AiK3izxl", "http://t.cn/AiK3iLVH" ] tmp_msg = [ "", "==================", "望大家多多支持,关注公众号:CodeRonin", "支持浪子点击文章最下面AD:" + ad_urls[ random.randint(1, len( ad_urls ) ) - 1 ] ] if is_rand and random.randint(1,10) < 8: tmp_msg = [] return tmp_msg # 获得群聊id def get_group_id(group_name): group_list = itchat.search_chatrooms(name=group_name) return group_list[0]['UserName'] #捕获退出信号,例如control + c signal.signal(signal.SIGINT, JobTask.exit ) signal.signal(signal.SIGTERM, JobTask.exit ) ''' 存储到数据库我使用线程处理,这样异步不影响返回 queue模块有三种队列及构造函数: Python queue模块的FIFO队列先进先出。 class queue.Queue(maxsize) LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize) 还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) ''' manager = multiprocessing.Manager() global_queue = multiprocessing.Queue() sendmail_flag = multiprocessing.Value( c_bool,False ) def syncConsumer(name): with app.app_context(): # http://flask-sqlalchemy.pocoo.org/2.3/contexts/ while True: try: app.logger.info( "syncConsumer,pid:%s"%( os.getpid() ) ) queue_msg = global_queue.get() # get_nowait()不会阻塞等待 app.logger.info('[%s]取到了[%s]' % (name, queue_msg['type'])) if queue_msg['type'] in [ "friends","group" ]: type = 2 if queue_msg['type'] == "group" else 1 WeChatBotService.initContact( queue_msg['data'],type ) elif queue_msg['type'] == "msg": WeChatBotService.addHistory(queue_msg['data']) time.sleep(1) except Exception as err: traceback.print_exc() exit( 15 )
总结
通过借助于 itchat 和图片机器人 ,然后基于 Python3 + Flask MVC + MySQL 就可以实现一个自己的微信机器人 ,过程其实相当简单,但是需要大家对 Python3 + Flask MVC 有一定的了解,并且本人根据项目经验总结出来了一套个性化的 :结构分层明了、高并发 MVC框架。
广告
如果您对Python Flask学习感兴趣 ,而您整好基础薄弱,可以看看我的两个课程,仅供您参考
Python Flask 从入门到精通: https://coding.imooc.com/class/399.html
Python Flask构建微信小程序订餐系统: https://coding.imooc.com/class/265.html