Python爬虫系列之多多买菜小程序数据爬取

释放双眼,带上耳机,听听看~!

代码仅供学习交流,请勿用于非法用途,如有侵犯请联系删除,代码仅供参考学习

直接上代码

# -*- coding:utf-8 -*-
import requests
import json
import time
from general import getAntiContent
import random
import configparser
import MySQLdb
import os

accesstoken = ""
headers = {
	"content-type": "application/json;charset=UTF-8",
	"accesstoken": accesstoken,
	"referer": "https://servicewechat.com/wxd9813e0a0d4d4156/49/page-frame.html",
	"user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/7.0.17(0x17001124) NetType/WIFI Language/zh_CN",
	"code-version": "0.0.43",
	"verifyauthtoken": "",
	"p-appname": "mobile-xcx-vegetable",
}
retry = 3
timeout = 20
provinceMap = {}
cf = configparser.ConfigParser()
try:
	cf.read(os.getcwd() + "/conf.ini", encoding="utf-8-sig")
except Exception as e:
	print("程序目录下不存在conf.ini配置文件~")
	exit(0)

keywords = ""
try:
	keywords = getConf("app-sys", "keywords").split(",")
except Exception as e:
	print("keywords参数错误!")
	exit(0)
# 启动时间点
startTime = getConf("app-sys", "start")
startTimes = []
try:
	startTimes = startTime.split(",")
	if startTimes is not None and len(startTimes) == 1 and startTimes[0] == "":
		startTimes = []
except Exception as e:
	pass
# 数据库账号
mysql_user = getConf("Mysql-Database", "user")
# 数据库密码
mysql_password = getConf("Mysql-Database", "password")
# 数据库名称
mysql_database = getConf("Mysql-Database", "database")
# 主机地址
mysql_host = getConf("Mysql-Database", "host")
# 端口
mysql_port = getConf("Mysql-Database", "port")


def querySQL(sql):
	try:
		conn = MySQLdb.connect(user=mysql_user, password=mysql_password, host=mysql_host, database=mysql_database, charset='utf8')
		cursor = conn.cursor()
		cursor.execute(sql)
		return cursor.fetchall()
	except Exception as e:
		return False


def getCurrDate():
	return str(time.strftime('%Y{y}%m{m}%d{d}').format(y='年', m='月', d='日'))

def tsToDate(ts):
	if ts:
		timeArray = time.localtime(int(ts))
		return str(time.strftime("%Y-%m-%d %H:%M:%S", timeArray))
	return ""


def getCurrentTime():
	return str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

def getCityMaps():
	cityMaps = {}
	if keywords and isinstance(keywords, list) and len(keywords) > 0:
		for keyword in keywords:
			try:
				arr = keyword.split("-")
				cityMaps[arr[0]] = {"city": arr[1], "scity": arr[2], "key": arr[3], }
			except Exception as e:
				pass
	return cityMaps


def iniProvinceMap():
	global provinceMap
	url = "https://api.pinduoduo.com/api/mc/v1/user/regions"
	data = {
		"open_app_source": 1089,
		"anti_content": getAntiContent(),
		"region_id": 1,
		"xcx_version": "0.0.64"
	}
	res = postHtml(url, json.dumps(data))
	try:
		regions = res['regions']
		for region in regions:
			try:
				provinceMap[region['region_name']] = region
			except Exception as e:
				pass
		return True
	except Exception as e:
		pass
	return False


def searchCity(region_id, cityName):
	url = "https://api.pinduoduo.com/api/mc/v1/user/regions"
	data = {
		"open_app_source": 1089,
		"anti_content": getAntiContent(),
		"region_id": int(region_id),
		"xcx_version": "0.0.64"
	}
	res = postHtml(url, json.dumps(data))
	try:
		regions = res['regions']
		for region in regions:
			try:
				if cityName in region['region_name']:
					return region
			except Exception as e:
				pass
	except Exception as e:
		pass


def searchPoi(provinceId, cityId, districtId, key):
	url = "https://api.pinduoduo.com/api/mc/v1/search_poi"
	data = {
		"open_app_source": 1089,
		"anti_content": getAntiContent(),
		"provinceId": int(provinceId),
		"query": str(key),
		"cityId": int(cityId),
		"districtId": int(districtId),
		"xcx_version": "0.0.64"
	}
	res = postHtml(url, json.dumps(data))
	try:
		poi_list = res['poi_list']
		return poi_list
	except Exception as e:
		pass


def getStore(provinceId, cityId, key):
	url = "https://api.pinduoduo.com/api/mc/v1/user/regions"
	data = {
		"open_app_source": 1089,
		"anti_content": getAntiContent(),
		"region_id": int(cityId),
		"xcx_version": "0.0.64"
	}
	res = postHtml(url, json.dumps(data))
	try:
		regions = res['regions']
		for region in regions:
			try:
				districtId = region['region_id']
				poiList = searchPoi(provinceId, cityId, districtId, key)
				if poiList and isinstance(poiList, list) and len(poiList) > 0:
					for poi in poiList:
						try:
							poiId = poi['poi_id']
							store = searchStore(poiId)
							if store:
								return store
						except Exception as e:
							pass
			except Exception as e:
				pass
	except Exception as e:
		pass


def getGoodsDetail(store_id, goods_id, city):
	url = "https://api.pinduoduo.com/api/mc/v0/goods_detail"
	data = {
		"open_app_source": 1089,
		"anti_content": getAntiContent(),
		"store_id": str(store_id),
		"goods_id": str(goods_id),
		"xcx_version": "0.0.64"
	}
	res = postHtml(url, json.dumps(data))
	try:
		datas = {}
		try:
			datas['goods_id'] = int(appflag + str(res['goods_id']))
		except Exception as e:
			return
		try:
			datas['area'] = city
		except Exception as e:
			datas['area'] = ""
		try:
			goods_name = str(res['goods_name'])
			if "【" not in goods_name and "】" not in goods_name:
				pname = goods_name.split(" ")
				if len(pname) > 1:
					goods_name = goods_name.replace(pname[0], "【" + pname[0] + "】")
			datas['goods_name'] = goods_name
		except Exception as e:
			datas['goods_name'] = ""
		try:
			datas['sc_price'] = float("%.2f" % (float(res['market_price']) / 100))
		except Exception as e:
			datas['sc_price'] = 0.00
		try:
			datas['ysj_price'] = float("%.2f" % (float(res['price']) / 100))
		except Exception as e:
			datas['ysj_price'] = 0.00
		try:
			datas['xg_num'] = res['regular_limit']
		except Exception as e:
			datas['xg_num'] = 0
		try:
			datas['xs_nums'] = sellNum
		except Exception as e:
			datas['xs_nums'] = 0
		try:
			datas['start_time'] = int(res['pre_sale_time'])
		except Exception as e:
			datas['start_time'] = 0
		try:
			datas['end_time'] = int(res['end_sale_time'])
		except Exception as e:
			datas['end_time'] = 0
		try:
			datas['qy_address'] = city + "多多买菜"
		except Exception as e:
			datas['qy_address'] = ""
		try:
			datas['imageb_url'] = detailPre + str(datas['goods_id'])
		except Exception as e:
			datas['imageb_url'] = ""
		try:
			sy_image = res['image_url']
			if "?" in sy_image:
				sy_image = sy_image[:sy_image.find("?")]
			datas['sy_image'] = sy_image
		except Exception as e:
			datas['sy_image'] = ""
		return datas
	except Exception as e:
		pass


def checkGoodsExists(pid):
	try:
		conn = MySQLdb.connect(user=mysql_user, password=mysql_password, database=mysql_database, charset='utf8',
							   host=mysql_host)
		cursor = conn.cursor()
		cursor.execute(
			"select * from goods_list where goods_id = %d" % (int(pid))
		)
		return len(cursor.fetchall()) > 0
	except Exception as e:
		return False


def add(data):
	print("insert ----------------------------------------------------")
	print(data)
	try:
		conn = MySQLdb.connect(user=mysql_user, host=mysql_host, password=mysql_password, database=mysql_database,
							   charset='utf8')
		cursor = conn.cursor()
		sql = ""
		cursor.execute(sql)
		conn.commit()
	except Exception as e:
		pass


def update(data):
	print("update ----------------------------------------------------")
	print(data)
	try:
		conn = MySQLdb.connect(user=mysql_user, host=mysql_host, password=mysql_password, database=mysql_database,
							   charset='utf8')
		cursor = conn.cursor()
		sql = ""
		cursor.execute(sql)
		conn.commit()
	except Exception as e:
		pass


def parser(storeId, city):
	page = 0
	url = "https://api.pinduoduo.com/api/mc/v0/goods_list"
	while True:
		try:
			data = {
				"open_app_source": 1089,
				"anti_content": getAntiContent(),
				"store_id": int(storeId),
				"list_id": "0d95f10a-620f-4d29-a087-894ff90239a4",
				"offset": page * 10,
				"count": 10,
				"xcx_version": "0.0.64"
			}
			res = postHtml(url, json.dumps(data))
			has_more = res['has_more']
			goods_list = res['goods_list']
			for goods in goods_list:
				try:
					goodsId = goods['goods_id']
					datas = getGoodsDetail(storeId, goodsId, city)
					existsStatus = checkGoodsExists(datas['goods_id'])
					if existsStatus:
						update(datas)
					else:
						add(datas
				except Exception as e:
					pass
			if has_more:
				page += 1
				time.sleep(getSleepTime())
			else:
				break
		except Exception as e:
			break


def main():
	global provinceMap
	cityMaps = getCityMaps()
	if cityMaps:
		for cityMap in cityMaps:
			try:
				province = provinceMap[cityMap]
				provinceId = province['region_id']
				bcity = cityMaps[cityMap]
				cityName = bcity['city']
				key = bcity['key']
				scity = bcity['scity']
				acity = searchCity(provinceId, cityName)
				cityId = acity['region_id']
				store = getStore(provinceId, cityId, key)
				if store:
					storeId = store['store_id']
					parser(storeId, scity)
				else:
					print("关键词组:%s 未搜索到任何店铺!" % (cityMap + " - " + cityName + " - " + key))
			except Exception as e:
				pass
	else:
		print("获取城市列表失败!")
	else:
		print("登录过期!")


if __name__ == '__main__':
	main()

人已赞赏
Python

2W字诚意满满的新活:常见接口测试69道面试题,附带答案

2020-11-24 14:14:43

thinkPHP

ThinkPHP 模型方法 setInc() 和 setDec() 使用详解

2020-11-21 16:42:00

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索