作业①:
要求:指定一个网站,爬取这个网站中的所有的所有图片,例如:中国气象网(http://www.weather.com.cn)。实现单线程和多线程的方式爬取。
1)单线程代码:
点击查看代码- import os
- import requests
- from bs4 import BeautifulSoup
- from urllib.parse import urljoin
- from concurrent.futures import ThreadPoolExecutor
- # 基础配置
- TARGET_URL = "http://www.weather.com.cn"
- HEADERS = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
- }
- def fetch_page(url):
- try:
- response = requests.get(url, headers=HEADERS, timeout=10)
- response.raise_for_status()
- return response.text
- except requests.RequestException as e:
- print(f"请求页面失败: {e}")
- return None
- def extract_image_urls(html, base_url):
- soup = BeautifulSoup(html, 'html.parser')
- img_tags = soup.find_all('img')
- img_urls = set()
-
- for img in img_tags:
- img_url = img.get('src')
- if img_url:
- absolute_url = urljoin(base_url, img_url)
- img_urls.add(absolute_url)
-
- return img_urls
- def download_image(img_url):
- try:
- print(f"开始下载: {img_url}")
- response = requests.get(img_url, headers=HEADERS, timeout=10)
- response.raise_for_status()
-
- filename = os.path.join("images", img_url.split("/")[-1])
- with open(filename, 'wb') as f:
- f.write(response.content)
- print(f"下载成功: {filename}")
- except Exception as e:
- print(f"下载失败: {img_url} 错误: {str(e)}")
- def single_threaded_crawler():
- print("=== 单线程爬虫开始 ===")
- html = fetch_page(TARGET_URL)
- if html:
- img_urls = extract_image_urls(html, TARGET_URL)
- os.makedirs("images", exist_ok=True)
-
- for url in img_urls:
- download_image(url)
- print("=== 单线程爬虫结束 ===")
- def multi_threaded_crawler():
- print("=== 多线程爬虫开始 ===")
- html = fetch_page(TARGET_URL)
- if html:
- img_urls = extract_image_urls(html, TARGET_URL)
- os.makedirs("images", exist_ok=True)
-
- with ThreadPoolExecutor(max_workers=5) as executor:
- executor.map(download_image, img_urls)
- print("=== 多线程爬虫结束 ===")
- if __name__ == "__main__":
- single_threaded_crawler()
- multi_threaded_crawler()
复制代码 输出结果:
多线程代码:
点击查看代码- import requests
- from bs4 import BeautifulSoup
- import os
- import time
- import threading
- from urllib.parse import urljoin, urlparse
- from queue import Queue
- class ConcurrentImageScraper:
- def __init__(self, start_url, page_limit=24, image_limit=124, worker_count=5):
- self.start_url = start_url
- self.page_limit = page_limit
- self.image_limit = image_limit
- self.worker_count = worker_count
- self.images_downloaded = 0
- self.processed_pages = set()
- self.url_queue = Queue()
- self.thread_lock = threading.Lock()
- self.http_session = requests.Session()
- self.http_session.headers.update({
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
- })
- # 初始化图片存储目录
- self.storage_dir = 'downloaded_images'
- os.makedirs(self.storage_dir, exist_ok=True)
- # 将起始URL加入队列
- self.url_queue.put(start_url)
- self.processed_pages.add(start_url)
- def validate_url(self, url):
- """验证URL是否合法"""
- parsed = urlparse(url)
- return all([parsed.netloc, parsed.scheme])
- def fetch_image(self, image_url, source_page):
- """获取并保存图片"""
- with self.thread_lock:
- if self.images_downloaded >= self.image_limit:
- return False
- try:
- # 处理相对路径
- final_url = image_url if image_url.startswith(('http://', 'https://')) \
- else urljoin(source_page, image_url)
- if not self.validate_url(final_url):
- return False
- # 验证图片扩展名
- supported_formats = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp')
- if not final_url.lower().endswith(supported_formats):
- return False
- print(f"{threading.current_thread().name} 正在获取图片: {final_url}")
- response = self.http_session.get(final_url, timeout=10)
- response.raise_for_status()
- # 生成唯一文件名
- filename = os.path.basename(urlparse(final_url).path) or f"img_{self.images_downloaded + 1}.jpg"
- save_path = os.path.join(self.storage_dir, filename)
-
- # 处理文件名冲突
- counter = 1
- while os.path.exists(save_path):
- name, ext = os.path.splitext(filename)
- save_path = os.path.join(self.storage_dir, f"{name}_{counter}{ext}")
- counter += 1
- # 写入文件
- with open(save_path, 'wb') as file:
- file.write(response.content)
- with self.thread_lock:
- self.images_downloaded += 1
- progress = self.images_downloaded
- print(f"{threading.current_thread().name} 已保存: {filename} (进度: {progress}/{self.image_limit})")
- return True
- except Exception as error:
- print(f"{threading.current_thread().name} 获取图片失败 {image_url}: {error}")
- return False
- def parse_page(self, page_url):
- """解析页面内容"""
- print(f"{threading.current_thread().name} 正在解析: {page_url}")
- try:
- response = self.http_session.get(page_url, timeout=10)
- response.raise_for_status()
- response.encoding = 'utf-8'
- page_content = BeautifulSoup(response.text, 'html.parser')
- # 提取图片链接
- image_elements = page_content.find_all('img')
- for img in image_elements:
- with self.thread_lock:
- if self.images_downloaded >= self.image_limit:
- return
- image_src = img.get('src') or img.get('data-src')
- if image_src:
- self.fetch_image(image_src, page_url)
- # 提取后续页面链接
- with self.thread_lock:
- if len(self.processed_pages) >= self.page_limit:
- return
- link_elements = page_content.find_all('a', href=True)
- for link in link_elements[:8]: # 限制每页处理的链接数
- with self.thread_lock:
- if self.images_downloaded >= self.image_limit or \
- len(self.processed_pages) >= self.page_limit:
- return
- next_page = link['href']
- if not next_page.startswith('http'):
- next_page = urljoin(page_url, next_page)
- if self.start_url in next_page and \
- next_page not in self.processed_pages and \
- len(self.processed_pages) < self.page_limit:
-
- with self.thread_lock:
- if next_page not in self.processed_pages:
- self.processed_pages.add(next_page)
- self.url_queue.put(next_page)
- except Exception as error:
- print(f"{threading.current_thread().name} 解析页面出错 {page_url}: {error}")
- def task_executor(self):
- """线程任务执行器"""
- while True:
- with self.thread_lock:
- if self.images_downloaded >= self.image_limit or \
- (self.url_queue.empty() and len(self.processed_pages) >= self.page_limit):
- break
- try:
- current_url = self.url_queue.get(timeout=5)
- self.parse_page(current_url)
- self.url_queue.task_done()
- except:
- break
- def run_scraper(self):
- """启动爬虫"""
- print("启动多线程爬虫...")
- print(f"起始URL: {self.start_url}")
- print(f"页面限制: {self.page_limit}")
- print(f"图片限制: {self.image_limit}")
- print(f"并发线程: {self.worker_count}")
- print("=" * 50)
- start = time.time()
- # 创建工作线程
- workers = []
- for idx in range(self.worker_count):
- worker = threading.Thread(
- target=self.task_executor,
- name=f"Worker-{idx + 1}",
- daemon=True
- )
- worker.start()
- workers.append(worker)
- # 等待所有任务完成
- self.url_queue.join()
- # 等待工作线程结束
- for worker in workers:
- worker.join(timeout=1)
- duration = time.time() - start
- print("=" * 50)
- print("爬取任务完成!")
- print(f"总耗时: {duration:.2f}秒")
- print(f"已处理页面: {len(self.processed_pages)}个")
- print(f"已下载图片: {self.images_downloaded}张")
- # 使用示例
- if __name__ == "__main__":
- scraper = ConcurrentImageScraper(
- start_url="http://www.weather.com.cn",
- page_limit=24,
- image_limit=124,
- worker_count=5
- )
- scraper.run_scraper()
复制代码 输出结果:
2)心得体会:
单线程爬虫实现简单,逻辑清晰。多线程爬虫比较复杂,但效率高。
作业②
要求:熟练掌握 scrapy 中 Item、Pipeline 数据的序列化输出方法;使用scrapy框架+Xpath+MySQL数据库存储技术路线爬取股票相关信息。
1)代码:
点击查看代码- import scrapy
- import json
- class EastmoneyStockSpider(scrapy.Spider):
- name = 'eastmoney_stock_spider'
- def initiate_requests(self):
- # 东方财富A股数据接口
- api_endpoints = [
- 'http://82.push2.eastmoney.com/api/qt/clist/get?pn=1&pz=20&po=1&np=1&ut=bd1d9ddb04089700cf9c27f6f7426281&fltt=2&invt=2&fid=f3&fs=m:0+t:6,m:0+t:13,m:0+t:80,m:1+t:2,m:1+t:23&fields=f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f12,f13,f14,f15,f16,f17,f18,f20,f21,f23,f24,f25,f22,f11,f62,f128,f136,f115,f152',
- ]
-
- for endpoint in api_endpoints:
- yield scrapy.Request(
- url=endpoint,
- callback=self.process_api_response,
- meta={'page_number': 1}
- )
- def process_api_response(self, response):
- try:
- response_data = json.loads(response.text)
- stock_list = response_data.get('data', {}).get('diff', [])
-
- for idx, (code, details) in enumerate(stock_list.items(), start=1):
- yield {
- 'rank': idx,
- 'code': details.get('f12', '未知'),
- 'name': details.get('f14', '未知'),
- 'price': details.get('f2', '未知'),
- 'change_percent': f"{details.get('f3', 0)}%",
- 'change_value': details.get('f4', '未知'),
- 'trading_volume': details.get('f5', '未知'),
- 'trading_value': details.get('f6', '未知'),
- 'price_range': f"{details.get('f7', 0)}%",
- 'daily_high': details.get('f15', '未知'),
- 'daily_low': details.get('f16', '未知'),
- 'opening_price': details.get('f17', '未知'),
- 'previous_closing': details.get('f18', '未知'),
- }
- except json.JSONDecodeError:
- self.logger.error(f"Failed to parse JSON response from {response.url}")
- except Exception as e:
- self.logger.error(f"Error processing response: {str(e)}")
- def start_requests(self):
- return self.initiate_requests()
复制代码 输出结果:
2)心得体会:
在本次针对东方财富网的股票数据采集实践中,我学会了如何组件Spider、Item、Pipeline、Middleware的架构,对Scrapy框架的运行机制理解更深入了。
作业③:
要求:熟练掌握 scrapy 中 Item、Pipeline 数据的序列化输出方法;使用scrapy框架+Xpath+MySQL数据库存储技术路线爬取外汇网站数据。
1)代码:
点击查看代码- import scrapy
- from boc_forex.items import BocForexItem
- class BankOfChinaForexSpider(scrapy.Spider):
- name = 'boc_forex_rates'
- allowed_domains = ['boc.cn']
- start_urls = ['https://www.boc.cn/sourcedb/whpj/']
- def parse(self, response, **kwargs):
- # 选择汇率数据表格中的行(跳过表头)
- currency_rows = response.xpath('//table[contains(@align, "left")]/tr[position() > 1]')
-
- for row in currency_rows:
-
- yield BocForexItem(
- currency=row.xpath('./td[1]/text()').get(default='').strip(),
- tbp=row.xpath('./td[2]/text()').get(default='').strip(), # 现汇买入价
- cash_buy_price=row.xpath('./td[3]/text()').get(default='').strip(), # 现钞买入价
- tsp=row.xpath('./td[4]/text()').get(default='').strip(), # 现汇卖出价
- cash_sell_price=row.xpath('./td[5]/text()').get(default='').strip(), # 现钞卖出价
- publish_time=row.xpath('./td[7]/text()').get(default='').strip() # 发布时间
- )
- def handle_error(self, failure):
- self.logger.error(f"Request failed: {failure.request.url}")
复制代码 输出结果:
2)心得体会:
在本次针对中国银行外汇牌价数据的采集实践中,我掌握了时间参数在URL中的传递机制。通过使用 XPath 选择器,我学会如何更高效的处理非结构化网页数据。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |