Python实现商品价格监控的方法-创新互联-成都创新互联网站建设

关于创新互联

多方位宣传企业产品与服务 突出企业形象

公司简介 公司的服务 荣誉资质 新闻动态 联系我们

Python实现商品价格监控的方法-创新互联

创新互联www.cdcxhl.cn八线动态BGP香港云服务器提供商,新人活动买多久送多久,划算不套路!

创新互联公司主要从事网站设计、成都做网站、网页设计、企业做网站、公司建网站等业务。立足成都服务西湖,十多年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18982081108

这篇文章将为大家详细讲解有关Python实现商品价格监控的方法,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

一年一度的“双十一”又要来了,很多人已经开始摩拳擦掌,毕竟几天之后手还在不在就不好说了。

各种社交软件也是跟着遭殃,整天就是“来帮我一起盖楼”,各种字体绕过屏蔽,什么奇葩的脑洞也出来了:

Python实现商品价格监控的方法

不过也感谢这些电商平台,让多年未联系的好友、加过但没有对话的陌生人都找到了打破尴尬的话题。(让场面更加尴尬)

月薪上万的白领们为了2块5毛钱的优惠券起早贪黑,也是堪称人类迷惑行为大赏了……

问题是,你以为自己真的赚到了?

商品“明降暗升”的传言早有耳闻:很多商品在双十一之前早早地把价格调高,加上优惠之后也不过就是跟以前的原价相当。让不知情的消费者在心理上感觉占了便宜。

这个传言是不是真的,很好判断,只要定期去访问商品页面,记录价格就可以。不过一般人也没闲工夫这么去做。于是,我们用 Python 做了一个可以定时监控商品的小工具,可以帮你监控想要关注的商品。

工具完成之后,我们随机挑选了几个商品作为测试,结果就有一个中招了……(真的是随便选的):

Python实现商品价格监控的方法

这款保暖背心产品,之前标价 39.9元,到11月之后却突然调价为 49.9元,并标注上了“双11狂欢价”,也就是原价……

Python实现商品价格监控的方法

Python实现商品价格监控的方法

商品价格监控

实现功能

·输入天猫、苏宁、京东、拼多多(网页页面 http://yangkeduo.com/)任一商品链接,不是口令。请复制选择好商品配置的页面链接,即返回相应商品价格,并保存到文件。商品页面若有团购与单独购买两个价格,返回团购价格。

·使用 Windows 任务计划或 Linux 定时任务,定时执行程序。获取不同时段的商品价格信息。

·单独运行画图程序,可根据定时任务获取的数据,生成商品价格时间变化折线图。

·程序监测的两件商品截图如下,具体文件在 pic 文件夹下 bnbx.html、kyy.html,推荐本地查看。

Python实现商品价格监控的方法

Python实现商品价格监控的方法

简单的商品查看页面 https://htmlpreview.github.io/?https://raw.githubusercontent.com/spiderbeg/price_monitor/master/search/search.html 。输入查询商品关键词,选择商城,即可查看相应商城商品列表。默认为苏宁。效果图如下。注意:点击后请等待一段时间即可,请勿频繁刷新。

Python实现商品价格监控的方法

运行环境

·python3.7

·Windows

·jupyter notebook

运行依赖包

·requests

·pyecharts

·beautifulsoup4

项目思路

部分问题回答

项目的大致思路流程:

·第一步:使用商品详细页链接获取商品信息与商品价格,并保存获取数据 时间、商品介绍,价格 到 csv 文件中;

·第二步:使用定时任务定时执行第一步完成的程序;

·第三步:读取前两步获取到的时间、商品介绍、价格数据。使用 pyecharts 绘制绘制商品价格时间变化折线图。

为什么不使用 pc 端来调试网页,获取价格信息?

因为在未登录状态天猫的详细商品页的信息是虚假的,同时从移动端网页入手,可以降低调试难度。

谷歌浏览器如何开启手机调试模式?

F12 进入开发者模式,然后鼠标点击一下,具体见下图,包括后文的查找价格接口信息。

Python实现商品价格监控的方法

实现代码

test.py

测试商品链接是否能够成功获取到商品价格。

import timing
"""
1、调用 timing.py 中的 go 方法测试链接的可用性
2、调用 timing.py 中的 go, get_url() 方法测试 goods.csv 文件中链接的可用性
"""
# 链接测试
# urls = ['https://m.suning.com/product/0000000000/000000011210599174.html?utm_source=baidu&utm_midium=brand-wuxian 
&utm_content=&utm_campaign=title&safp=f73ee1cf.wapindex7.113464229882.4&safc=prd.1.rec_14-40_0_A_ab:A',
# 'https://m.suning.com/product/0070067092/000000000188392234.html?utm_source=baidu&utm_midium=brand-wuxian& 
utm_content=&utm_campaign=title&safp=f73ee1cf.wapindex7.113464229882.60&safc=prd.1.rec_5-5_1018C,1014C$
c3ae37eafeb814a098d120647449da6f_H_ab:A',
# 'https://m.suning.com/product/0000000000/000000000107426461.html?src=snsxpd_none_recssxcnxhq_1-3_p_0000000000 
_000000000107426461_rec_21-65_3_A&safp=f73ee1cf.71jyzx.112079032536.4&safc=prd.1.rec_21-65_3_A',
# 'https://m.suning.com/product/0000000000/10606656136.html?safp=f73ee1cf.phone2019.121927933306.2&safc=prd.0.0']
# 输入文本的链接可用性测试
if __name__ == '__main__':
    urls = timing.get_url()
    for url in urls:
        try:
            timing.go(url) # 获取返回信息 
        except BaseException as e:
            print(url,'\n',e)

timing.py

进行定时抓取任务时,运行的文件。

# encoding:utf8
import time
import os
import re
import csv
from shop.jd import JD # 自定义
from shop.tm import TM
from shop.sn import SN
from shop.pdd import PDD
from apscheduler.schedulers.blocking import BlockingScheduler

# import logging
# formats = "%(asctime)s %(name)s %(levelname)s function:%(funcName)s -> :%(message)s"
# logging.basicConfig(format=formats, datefmt='%m/%d/%Y %I:%M:%S %p') # ,handlers=[logging.FileHandler(log_path, 
'a+', 'utf-8')]
# LOGGER = logging.getLogger(__name__)
# LOGGER.setLevel(logging.INFO)
basePath = os.path.dirname(os.path.abspath(__file__)) # 当前文件夹
def get_date():
    """获取日期"""
    timestamp = int(time.time())
    time_local = time.localtime(timestamp) # #时间戳 转 时间数组
    dt = time.strftime("%Y-%m-%d %H:%M:%S",time_local) # #时间数组 转 新的时间格式(2016-05-05 20:28:54)
    return dt
def get_url():
    """读取商品链接
    返回:图像名,商品名,商品链接 元组
    """
    urls = []
    with open(os.path.join(basePath, 'goods.csv'),'r',encoding='utf8') as f:
        f_csv = csv.reader(f)
        next(f_csv) # 返回标题,直接到内容
        for row in f_csv: # 内容
            if row:
                urls.append(row)
    return urls
def go(url):
    '''输入:链接
    输出:(时间,标题,商品价格), 文件路径 元组
    统一价格输出,以最低价格为标准,如有团购和单独购买以单独购买为准
    '''
    result = re.findall('://(.+?).com', url[2])
    if result:
        result = result[0]
        if 'yangkeduo' in result:
            pd = PDD(url[2])
            title,price = pd.main()
        elif 'suning' in result:
            sn = SN(url[2])
            title,price = sn.main()
        elif 'tmall' in result or 'taobao' in result:
            tm = TM(url[2]) # 605030977928:联想笔记本 ; 603330883901 华为 mate30 pro ; 523962011119: 酸奶 
            title,price = tm.main()
        elif 'jd' in result:
            jd = JD(url[2]) # 测试 id:100009083152 商品:联想 y9000x 笔记本电脑 2 热水壶 or 薯条?
            title,price = jd.main()
        else:
            raise TypeError('请检查输入的网站链接')
        print('%s 标题 %s, 价格(多个价格以团购为准) %s. '%(result,title,price))
    else:
        raise TypeError('请检查输入是否为目标网站的商品详细页面链接')
    # 文件名
    replace_string = ['.',' ',r'/',r'\\']
    for rs in replace_string:
        url[1] = url[1].replace(rs,'_')
    path = os.path.join(os.path.join(basePath, 'data'), url[1]+'.csv')
    today = get_date() # 日期
    return (today, title, price),path
def addData(row, path):
    """数据写入文件"""
    with open(path,'a+',encoding='utf8') as f:
        fieldnames = ['时间', '标题','价格']
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        if f.tell() == 0: # 如果内容为空则添加标题
            writer.writeheader()
        writer.writerow({'时间': row[0], '标题': row[1],'价格':row[2]})
def main():
    """运行程序"""
    urls = get_url()
    for url in urls:
        try:
            row,path = go(url) # 获取返回信息 
            addData(row,path) # 写入文件
        except BaseException as e:
            print('请求问题?报错:%s'%e)
if __name__ == '__main__':
    print('时间',get_date())
    main()
    # scheduler = BlockingScheduler()
    # scheduler.add_job(go,'cron', args=[url],hour='8-23', minute= '5,35' , second='15')
    # # scheduler.add_job(main,'cron', args=[3088512],hour='8-23', minute= 5 , second='15')
    # print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
    # try:
    #     scheduler.start()
    # except (KeyboardInterrupt, SystemExit):
    #     pass

draw.py

图像文件生成在 pic 文件中。

# encoding: utf8
from pyecharts import options as opts
from pyecharts.charts import Page, Line
import os
import csv

basePath = os.path.dirname(os.path.abspath(__file__)) # 当前文件夹
def line(title,checktime,price) -> Line:
    """绘图函数"""
    c = (
        Line()
        .add_xaxis(checktime)
        .add_yaxis(title, price, is_smooth=True)
        .set_global_opts(title_opts=opts.TitleOpts(title="商品价格"),
                yaxis_opts=opts.AxisOpts(name="元/台"),
                xaxis_opts=opts.AxisOpts(name=title,
                axislabel_opts=opts.LabelOpts(formatter="{value}", font_size=12, rotate=30,) # x,y 轴标签
                    )
                )
        )
    return c
def files():
    """
    输出字典,每一个键值代表一张图表
    """
    global basePath
    files = {}
    with open(os.path.join(basePath,'goods.csv'),'r',encoding='utf8') as f:
        f_csv = csv.reader(f)
        next(f_csv) # 标题
        for row in f_csv: # 内容
            if row:
                replace_string = ['.',' ',r'/',r'\\'] # 特殊字符处理
                for rs in replace_string:
                    row[1] = row[1].replace(rs,'_')
                files.setdefault(row[0],[]).append(row[1])
    return files
def draw(files):
    """绘制图形文件"""
    datapath = os.path.join(basePath,'data')
    picpath = os.path.join(basePath,'pic')
    for k,i in files.items():
        page = Page()
        for n in i:
            try:
                with open(os.path.join(datapath, n +'.csv'),'r', encoding='utf8') as f:
                    f_csv = csv.DictReader(f)
                    price,checktime = [],[]
                    for row in f_csv:
                        checktime.append(row['时间'])
                        price.append(row['价格'])
                    title = n
                page.add(line(title,checktime,price)) # 24 发帖回帖变化图、近3月变化图、浏览、回复散点图
            except:
                print('未制图:',n)
        page.render(os.path.join(picpath, k +'.html'))
if __name__ == '__main__':
    draw(files())

关于Python实现商品价格监控的方法就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


文章名称:Python实现商品价格监控的方法-创新互联
转载注明:http://kswsj.cn/article/coojoh.html

其他资讯