中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

python爬取基于m3u8協(xié)議的ts文件并合并-創(chuàng)新互聯(lián)

前言

目前創(chuàng)新互聯(lián)建站已為上1000+的企業(yè)提供了網(wǎng)站建設(shè)、域名、虛擬空間、綿陽服務(wù)器托管、企業(yè)網(wǎng)站設(shè)計、龍南網(wǎng)站維護等服務(wù),公司將堅持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。

簡單學(xué)習(xí)過網(wǎng)絡(luò)爬蟲,只是之前都是照著書上做并發(fā),大概能理解,卻還是無法自己用到自己項目中,這里自己研究實現(xiàn)一個網(wǎng)頁嗅探HTML5播放控件中基于m3u8協(xié)議ts格式視頻資源的項目,并未考慮過復(fù)雜情況,畢竟只是練練手.

源碼

# coding=utf-8
import asyncio
import multiprocessing
import os
import re
import time
from math import floor
from multiprocessing import Manager
import aiohttp
import requests
from lxml import html
import threading
from src.my_lib import retry
from src.my_lib import time_statistics


class M3U8Download:
 _path = "./resource\\" # 本地文件路徑
 _url_seed = None # 資源所在鏈接前綴
 _target_url = {} # 資源任務(wù)目標字典
 _mode = ""
 _headers = {"User-agent": "Mozilla/5.0"} # 瀏覽器代理
 _target_num = 100

 def __init__(self):
 self._ml = Manager().list() # 進程通信列表
 if not os.path.exists(self._path): # 檢測本地目錄存在否
  os.makedirs(self._path)
 exec_str = r'chcp 65001'
 os.system(exec_str) # 先切換utf-8輸出,防止控制臺亂碼

 def sniffing(self, url):
 self._url = url
 print("開始嗅探...")
 try:
  r = requests.get(self._url) # 訪問嗅探網(wǎng)址,獲取網(wǎng)頁信息
 except:
  print("嗅探失敗,網(wǎng)址不正確")
  os.system("pause")
 else:
  tree = html.fromstring(r.content)
  try:
  source_url = tree.xpath('//video//source/@src')[0] # 嗅探資源控制文件鏈接,這里只針對一個資源控制文件
  # self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 從資源控制文件鏈接解析域名
  except:
  print("嗅探失敗,未發(fā)現(xiàn)資源")
  os.system("pause")
  else:
  self.analysis(source_url)

 def analysis(self, source_url):
 try:
  self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 從資源控制文件鏈接解析域名
  with requests.get(source_url) as r: # 訪問資源控制文件,獲得資源信息
  src = re.split("\n*#.+\n", r.text) # 解析資源信息
  for sub_src in src: # 將資源地址儲存到任務(wù)字典
   if sub_src:
   self._target_url[sub_src] = self._url_seed + "/" + sub_src
 except Exception as e:
  print("資源無法成功解析", e)
  os.system("pause")
 else:
  self._target_num = len(self._target_url)
  print("sniffing success!!!,found", self._target_num, "url.")
  self._mode = input(
  "1:-> 單進程(Low B)\n2:-> 多進程+多線程(網(wǎng)速開始biubiu飛起!)\n3:-> 多進程+協(xié)程(最先進的并發(fā)!!!)\n")
  if self._mode == "1":
  for path, url in self._target_url.items():
   self._download(path, url)
  elif self._mode == "2" or self._mode == "3":
  self._multiprocessing()

 def _multiprocessing(self, processing_num=4): # 多進程,多線程
 target_list = {} # 進程任務(wù)字典,儲存每個進程分配的任務(wù)
 pool = multiprocessing.Pool(processes=processing_num) # 開啟進程池
 i = 0 # 任務(wù)分配標識
 for path, url in self._target_url.items(): # 分配進程任務(wù)
  target_list[path] = url
  i += 1
  if i % 10 == 0 or i == len(self._target_url): # 每個進程分配十個任務(wù)
  if self._mode == "2":
   pool.apply_async(self._sub_multithreading, kwds=target_list) # 使用多線程驅(qū)動方法
  else:
   pool.apply_async(self._sub_coroutine, kwds=target_list) # 使用協(xié)程驅(qū)動方法
  target_list = {}
 pool.close() # join函數(shù)等待所有子進程結(jié)束
 pool.join() # 調(diào)用join之前,先調(diào)用close函數(shù),否則會出錯。執(zhí)行完close后不會有新的進程加入到pool
 while True:
  if self._judge_over():
  self._combine()
  break

 def _sub_multithreading(self, **kwargs):
 for path, url in kwargs.items(): # 根據(jù)進程任務(wù)開啟線程
  t = threading.Thread(target=self._download, args=(path, url,))
  t.start()

 @retry()
 def _download(self, path, url): # 同步下載方法
 with requests.get(url, headers=self._headers) as r:
  if r.status_code == 200:
  with open(self._path + path, "wb")as file:
   file.write(r.content)
  self._ml.append(0) # 每成功一個就往進程通信列表增加一個值
  percent = '%.2f' % (len(self._ml) / self._target_num * 100)
  print(len(self._ml), ": ", path, "->OK", "\tcomplete:", percent, "%") # 顯示下載進度
  else:
  print(path, r.status_code, r.reason)

 def _sub_coroutine(self, **kwargs):
 tasks = []
 for path, url in kwargs.items(): # 根據(jù)進程任務(wù)創(chuàng)建協(xié)程任務(wù)列表
  tasks.append(asyncio.ensure_future(self._async_download(path, url)))
 loop = asyncio.get_event_loop() # 創(chuàng)建異步事件循環(huán)
 loop.run_until_complete(asyncio.wait(tasks)) # 注冊任務(wù)列表

 async def _async_download(self, path, url): # 異步下載方法
 async with aiohttp.ClientSession() as session:
  async with session.get(url, headers=self._headers) as resp:
  try:
   assert resp.status == 200, "E" # 斷言狀態(tài)碼為200,否則拋異常,觸發(fā)重試裝飾器
   with open(self._path + path, "wb")as file:
   file.write(await resp.read())
  except Exception as e:
   print(e)
  else:
   self._ml.append(0) # 每成功一個就往進程通信列表增加一個值
   percent = '%.2f' % (len(self._ml) / self._target_num * 100)
   print(len(self._ml), ": ", path, "->OK", "\tcomplete:", percent, "%") # 顯示下載進度

 def _combine(self): # 組合資源方法
 try:
  print("開始組合資源...")
  identification = str(floor(time.time()))
  exec_str = r'copy /b "' + self._path + r'*.ts" "' + self._path + 'video' + identification + '.mp4"'
  os.system(exec_str) # 使用cmd命令將資源整合
  exec_str = r'del "' + self._path + r'*.ts"'
  os.system(exec_str) # 刪除原來的文件
 except:
  print("資源組合失敗")
 else:
  print("資源組合成功!")

 def _judge_over(self): # 判斷是否全部下載完成
 if len(self._ml) == len(self._target_url):
  return True
 return False


@time_statistics
def app():
 multiprocessing.freeze_support()
 url = input("輸入嗅探網(wǎng)址:\n")
 m3u8 = M3U8Download()
 m3u8.sniffing(url)
 # m3u8.analysis(url)


if __name__ == "__main__":
 app()

網(wǎng)頁標題:python爬取基于m3u8協(xié)議的ts文件并合并-創(chuàng)新互聯(lián)
文章地址:http://www.rwnh.cn/article2/ddcooc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供小程序開發(fā)、響應(yīng)式網(wǎng)站品牌網(wǎng)站設(shè)計、移動網(wǎng)站建設(shè)Google、商城網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站優(yōu)化排名
武定县| 确山县| 全椒县| 昆明市| 绥棱县| 鄂尔多斯市| 景东| 莲花县| 兴文县| 屯留县| 祁连县| 铅山县| 颍上县| 宜州市| 呼和浩特市| 岐山县| 陕西省| 海南省| 九寨沟县| 吉木乃县| 醴陵市| 忻城县| 大厂| 南涧| 银川市| 德钦县| 栖霞市| 那曲县| 宁国市| 佛坪县| 衡南县| 微山县| 淅川县| 调兵山市| 乌兰县| 天峻县| 玛沁县| 荥经县| 保康县| 扶沟县| 沙坪坝区|