重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
创新互联www.cdcxhl.cn八线动态BGP香港云服务器提供商,新人活动买多久送多久,划算不套路!
创新互联公司主要从事网站设计、成都做网站、网页设计、企业做网站、公司建网站等业务。立足成都服务西湖,十多年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18982081108这篇文章将为大家详细讲解有关Python实现商品价格监控的方法,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
一年一度的“双十一”又要来了,很多人已经开始摩拳擦掌,毕竟几天之后手还在不在就不好说了。
各种社交软件也是跟着遭殃,整天就是“来帮我一起盖楼”,各种字体绕过屏蔽,什么奇葩的脑洞也出来了:
不过也感谢这些电商平台,让多年未联系的好友、加过但没有对话的陌生人都找到了打破尴尬的话题。(让场面更加尴尬)
月薪上万的白领们为了2块5毛钱的优惠券起早贪黑,也是堪称人类迷惑行为大赏了……
问题是,你以为自己真的赚到了?
商品“明降暗升”的传言早有耳闻:很多商品在双十一之前早早地把价格调高,加上优惠之后也不过就是跟以前的原价相当。让不知情的消费者在心理上感觉占了便宜。
这个传言是不是真的,很好判断,只要定期去访问商品页面,记录价格就可以。不过一般人也没闲工夫这么去做。于是,我们用 Python 做了一个可以定时监控商品的小工具,可以帮你监控想要关注的商品。
工具完成之后,我们随机挑选了几个商品作为测试,结果就有一个中招了……(真的是随便选的):
这款保暖背心产品,之前标价 39.9元,到11月之后却突然调价为 49.9元,并标注上了“双11狂欢价”,也就是原价……
商品价格监控
实现功能
·输入天猫、苏宁、京东、拼多多(网页页面 http://yangkeduo.com/)任一商品链接,不是口令。请复制选择好商品配置的页面链接,即返回相应商品价格,并保存到文件。商品页面若有团购与单独购买两个价格,返回团购价格。
·使用 Windows 任务计划或 Linux 定时任务,定时执行程序。获取不同时段的商品价格信息。
·单独运行画图程序,可根据定时任务获取的数据,生成商品价格时间变化折线图。
·程序监测的两件商品截图如下,具体文件在 pic 文件夹下 bnbx.html、kyy.html,推荐本地查看。
简单的商品查看页面 https://htmlpreview.github.io/?https://raw.githubusercontent.com/spiderbeg/price_monitor/master/search/search.html 。输入查询商品关键词,选择商城,即可查看相应商城商品列表。默认为苏宁。效果图如下。注意:点击后请等待一段时间即可,请勿频繁刷新。
运行环境
·python3.7
·Windows
·jupyter notebook
运行依赖包
·requests
·pyecharts
·beautifulsoup4
项目思路
部分问题回答
项目的大致思路流程:
·第一步:使用商品详细页链接获取商品信息与商品价格,并保存获取数据 时间、商品介绍,价格 到 csv 文件中;
·第二步:使用定时任务定时执行第一步完成的程序;
·第三步:读取前两步获取到的时间、商品介绍、价格数据。使用 pyecharts 绘制绘制商品价格时间变化折线图。
为什么不使用 pc 端来调试网页,获取价格信息?
因为在未登录状态天猫的详细商品页的信息是虚假的,同时从移动端网页入手,可以降低调试难度。
谷歌浏览器如何开启手机调试模式?
F12 进入开发者模式,然后鼠标点击一下,具体见下图,包括后文的查找价格接口信息。
实现代码
test.py
测试商品链接是否能够成功获取到商品价格。
import timing """ 1、调用 timing.py 中的 go 方法测试链接的可用性 2、调用 timing.py 中的 go, get_url() 方法测试 goods.csv 文件中链接的可用性 """ # 链接测试 # urls = ['https://m.suning.com/product/0000000000/000000011210599174.html?utm_source=baidu&utm_midium=brand-wuxian &utm_content=&utm_campaign=title&safp=f73ee1cf.wapindex7.113464229882.4&safc=prd.1.rec_14-40_0_A_ab:A', # 'https://m.suning.com/product/0070067092/000000000188392234.html?utm_source=baidu&utm_midium=brand-wuxian& utm_content=&utm_campaign=title&safp=f73ee1cf.wapindex7.113464229882.60&safc=prd.1.rec_5-5_1018C,1014C$ c3ae37eafeb814a098d120647449da6f_H_ab:A', # 'https://m.suning.com/product/0000000000/000000000107426461.html?src=snsxpd_none_recssxcnxhq_1-3_p_0000000000 _000000000107426461_rec_21-65_3_A&safp=f73ee1cf.71jyzx.112079032536.4&safc=prd.1.rec_21-65_3_A', # 'https://m.suning.com/product/0000000000/10606656136.html?safp=f73ee1cf.phone2019.121927933306.2&safc=prd.0.0'] # 输入文本的链接可用性测试 if __name__ == '__main__': urls = timing.get_url() for url in urls: try: timing.go(url) # 获取返回信息 except BaseException as e: print(url,'\n',e)
timing.py
进行定时抓取任务时,运行的文件。
# encoding:utf8 import time import os import re import csv from shop.jd import JD # 自定义 from shop.tm import TM from shop.sn import SN from shop.pdd import PDD from apscheduler.schedulers.blocking import BlockingScheduler # import logging # formats = "%(asctime)s %(name)s %(levelname)s function:%(funcName)s -> :%(message)s" # logging.basicConfig(format=formats, datefmt='%m/%d/%Y %I:%M:%S %p') # ,handlers=[logging.FileHandler(log_path, 'a+', 'utf-8')] # LOGGER = logging.getLogger(__name__) # LOGGER.setLevel(logging.INFO) basePath = os.path.dirname(os.path.abspath(__file__)) # 当前文件夹 def get_date(): """获取日期""" timestamp = int(time.time()) time_local = time.localtime(timestamp) # #时间戳 转 时间数组 dt = time.strftime("%Y-%m-%d %H:%M:%S",time_local) # #时间数组 转 新的时间格式(2016-05-05 20:28:54) return dt def get_url(): """读取商品链接 返回:图像名,商品名,商品链接 元组 """ urls = [] with open(os.path.join(basePath, 'goods.csv'),'r',encoding='utf8') as f: f_csv = csv.reader(f) next(f_csv) # 返回标题,直接到内容 for row in f_csv: # 内容 if row: urls.append(row) return urls def go(url): '''输入:链接 输出:(时间,标题,商品价格), 文件路径 元组 统一价格输出,以最低价格为标准,如有团购和单独购买以单独购买为准 ''' result = re.findall('://(.+?).com', url[2]) if result: result = result[0] if 'yangkeduo' in result: pd = PDD(url[2]) title,price = pd.main() elif 'suning' in result: sn = SN(url[2]) title,price = sn.main() elif 'tmall' in result or 'taobao' in result: tm = TM(url[2]) # 605030977928:联想笔记本 ; 603330883901 华为 mate30 pro ; 523962011119: 酸奶 title,price = tm.main() elif 'jd' in result: jd = JD(url[2]) # 测试 id:100009083152 商品:联想 y9000x 笔记本电脑 2 热水壶 or 薯条? title,price = jd.main() else: raise TypeError('请检查输入的网站链接') print('%s 标题 %s, 价格(多个价格以团购为准) %s. '%(result,title,price)) else: raise TypeError('请检查输入是否为目标网站的商品详细页面链接') # 文件名 replace_string = ['.',' ',r'/',r'\\'] for rs in replace_string: url[1] = url[1].replace(rs,'_') path = os.path.join(os.path.join(basePath, 'data'), url[1]+'.csv') today = get_date() # 日期 return (today, title, price),path def addData(row, path): """数据写入文件""" with open(path,'a+',encoding='utf8') as f: fieldnames = ['时间', '标题','价格'] writer = csv.DictWriter(f, fieldnames=fieldnames) if f.tell() == 0: # 如果内容为空则添加标题 writer.writeheader() writer.writerow({'时间': row[0], '标题': row[1],'价格':row[2]}) def main(): """运行程序""" urls = get_url() for url in urls: try: row,path = go(url) # 获取返回信息 addData(row,path) # 写入文件 except BaseException as e: print('请求问题?报错:%s'%e) if __name__ == '__main__': print('时间',get_date()) main() # scheduler = BlockingScheduler() # scheduler.add_job(go,'cron', args=[url],hour='8-23', minute= '5,35' , second='15') # # scheduler.add_job(main,'cron', args=[3088512],hour='8-23', minute= 5 , second='15') # print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) # try: # scheduler.start() # except (KeyboardInterrupt, SystemExit): # pass
draw.py
图像文件生成在 pic 文件中。
# encoding: utf8 from pyecharts import options as opts from pyecharts.charts import Page, Line import os import csv basePath = os.path.dirname(os.path.abspath(__file__)) # 当前文件夹 def line(title,checktime,price) -> Line: """绘图函数""" c = ( Line() .add_xaxis(checktime) .add_yaxis(title, price, is_smooth=True) .set_global_opts(title_opts=opts.TitleOpts(title="商品价格"), yaxis_opts=opts.AxisOpts(name="元/台"), xaxis_opts=opts.AxisOpts(name=title, axislabel_opts=opts.LabelOpts(formatter="{value}", font_size=12, rotate=30,) # x,y 轴标签 ) ) ) return c def files(): """ 输出字典,每一个键值代表一张图表 """ global basePath files = {} with open(os.path.join(basePath,'goods.csv'),'r',encoding='utf8') as f: f_csv = csv.reader(f) next(f_csv) # 标题 for row in f_csv: # 内容 if row: replace_string = ['.',' ',r'/',r'\\'] # 特殊字符处理 for rs in replace_string: row[1] = row[1].replace(rs,'_') files.setdefault(row[0],[]).append(row[1]) return files def draw(files): """绘制图形文件""" datapath = os.path.join(basePath,'data') picpath = os.path.join(basePath,'pic') for k,i in files.items(): page = Page() for n in i: try: with open(os.path.join(datapath, n +'.csv'),'r', encoding='utf8') as f: f_csv = csv.DictReader(f) price,checktime = [],[] for row in f_csv: checktime.append(row['时间']) price.append(row['价格']) title = n page.add(line(title,checktime,price)) # 24 发帖回帖变化图、近3月变化图、浏览、回复散点图 except: print('未制图:',n) page.render(os.path.join(picpath, k +'.html')) if __name__ == '__main__': draw(files())
关于Python实现商品价格监控的方法就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。