【www.bbyears.com--python】
今天把刚学python时收藏的几篇多线程下载文件的网页看了一下。
实现简单的多线程下载,需要关注如下几点:
1.文件的大小:可以从reponse header中提取,如“Content-Length:911”表示大小是911字节
2.任务拆分:指定各个线程下载的文件的哪一块,可以通过request header中添加“Range: bytes=300-400”(表示下载300~400byte的内容),注意可以请求的文件的range是[0, size-1]字节的。
3.下载文件的聚合:各个线程将自己下载的文件块保存为临时文件,所有线程都完成后,再将这些临时文件按顺序聚合写入到最终的一个文件中。
网上看到一个不错的多线程下载文件的例子,是根据Linux上多线程下载工具axel的思想来写的,源代码在:http://fayaa.com/code/view/58/full/
我将其收录起来,后续需要时可以基于它来修改,示例代码如下:
https://github.com/smilejay/python/blob/master/py2014/paxel.py
# -*- coding: utf-8 -*-
# filename: paxel.py
# FROM: http://fayaa.com/code/view/58/full/
# Jay modified it a little and save for further potential usage.
"""It is a multi-thread downloading tool
It was developed following axel.
Author: volans
E-mail: volansw [at] gmail.com
"""
import sys
import os
import time
import urllib
from threading import Thread
# in case you want to use http_proxy
local_proxies = {"http": "http://131.139.58.200:8080"}
class AxelPython(Thread, urllib.FancyURLopener):
"""Multi-thread downloading class.
run() is a vitural method of Thread.
"""
def __init__(self, threadname, url, filename, ranges=0, proxies={}):
Thread.__init__(self, name=threadname)
urllib.FancyURLopener.__init__(self, proxies)
self.name = threadname
self.url = url
self.filename = filename
self.ranges = ranges
self.downloaded = 0
def run(self):
"""vertual function in Thread"""
try:
self.downloaded = os.path.getsize(self.filename)
except OSError:
#print "never downloaded"
self.downloaded = 0
# rebuild start poind
self.startpoint = self.ranges[0] + self.downloaded
# This part is completed
if self.startpoint >= self.ranges[1]:
print "Part %s has been downloaded over." % self.filename
return
self.oneTimeSize = 16384 # 16kByte/time
print "task %s will download from %d to %d" % (self.name, self.startpoint, self.ranges[1])
self.addheader("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1]))
self.urlhandle = self.open(self.url)
data = self.urlhandle.read(self.oneTimeSize)
while data:
filehandle = open(self.filename, "ab+")
filehandle.write(data)
filehandle.close()
self.downloaded += len(data)
#print "%s" % (self.name)
#progress = u"r..."
data = self.urlhandle.read(self.oneTimeSize)
def GetUrlFileSize(url, proxies={}):
urlHandler = urllib.urlopen(url, proxies=proxies)
headers = urlHandler.info().headers
length = 0
for header in headers:
if header.find("Length") != -1:
length = header.split(":")[-1].strip()
length = int(length)
return length
def SpliteBlocks(totalsize, blocknumber):
blocksize = totalsize / blocknumber
ranges = []
for i in range(0, blocknumber - 1):
ranges.append((i * blocksize, i * blocksize + blocksize - 1))
ranges.append((blocksize * (blocknumber - 1), totalsize - 1))
return ranges
def islive(tasks):
for task in tasks:
if task.isAlive():
return True
return False
def paxel(url, output, blocks=6, proxies=local_proxies):
""" paxel
"""
size = GetUrlFileSize(url, proxies)
ranges = SpliteBlocks(size, blocks)
threadname = ["thread_%d" % i for i in range(0, blocks)]
filename = ["tmpfile_%d" % i for i in range(0, blocks)]
tasks = []
for i in range(0, blocks):
task = AxelPython(threadname[i], url, filename[i], ranges[i])
task.setDaemon(True)
task.start()
tasks.append(task)
time.sleep(2)
while islive(tasks):
downloaded = sum([task.downloaded for task in tasks])
process = downloaded / float(size) * 100
show = u"rFilesize:%d Downloaded:%d Completed:%.2f%%" % (size, downloaded, process)
sys.stdout.write(show)
sys.stdout.flush()
time.sleep(0.5)
filehandle = open(output, "wb+")
for i in filename:
f = open(i, "rb")
filehandle.write(f.read())
f.close()
try:
os.remove(i)
pass
except:
pass
filehandle.close()
if __name__ == "__main__":
url = "http://dldir1.qq.com/qqfile/QQforMac/QQ_V3.1.1.dmg"
output = "download.file"
paxel(url, output, blocks=4, proxies={})
添加了https用户名密码验证:
由于需要下载https带ssl验证的文件,而且经常会出现单进程卡死的情况就考虑用了多线程下载
import os
import getpass,urllib2,sys
import cookielib
import threading,thread
url = 你的https://url
username = 你的用户名
password = 你的密码
#class which supply request authcation info
class TerminalPassword(urllib2.HTTPPasswordMgr):
def find_user_password(self, realm, authuri):
retval = urllib2.HTTPPasswordMgr.find_user_password(self, realm,
authuri)
if retval[0] == None and retval[1] == None:
user = username
passwd = password
return (user, passwd)
else:
return retval
"""It is a multi-thread downloading tool
It was developed follow axel.
Author: volans
E-mail: volansw [at] gmail.com
modify:gavin ma
date:2011-04-12
"""
def Init():
try:
cj = cookielib.CookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj),urllib2.HTTPBasicAuthHandler(TerminalPassword()))
urllib2.install_opener(opener)
urllib2.urlopen(url)
return True
except Exception,e:
pass
class AxelPython(threading.Thread):
"""Multi-thread downloading class.
run() is a vitural method of Thread.
"""
def __init__(self, threadname, url, virus, filename, ranges=0):
threading.Thread.__init__(self, name=threadname)
self.name = threadname
self.url = url
self.virus = virus
self.filename = filename
self.ranges = ranges
self.downloaded = 0
self.flag = False
def run(self):
"""vertual function in Thread"""
try:
self.downloaded = os.path.getsize( self.filename )
except OSError:
self.downloaded = 0
# rebuild start poind
self.startpoint = self.ranges[0] + self.downloaded
# This part is completed
if self.startpoint >= self.ranges[1]:
print "Part %s has been downloaded over." % self.filename
return
self.oneTimeSize = 8000 #16kByte/time
try :
Init()
req = urllib2.Request(self.url+self.virus)
req.add_header("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1]))
urlhandle = urllib2.urlopen(req)
data = urlhandle.read(self.oneTimeSize)
while data:
filehandle = open( self.filename, "ab+" )
filehandle.write( data )
filehandle.close()
self.downloaded += len( data )
data = urlhandle.read( self.oneTimeSize )
self.flag = True
except :
self.flag = False
pass
def GetUrlFileSize(url, samplevirus):
if Init():
req = urllib2.Request(url+samplevirus)
urlHandler = urllib2.urlopen(req)
headers = urlHandler.info().headers
length = 0
for header in headers:
if header.find("Content-Length") != -1:
length = header.split(":")[-1].strip()
length = int(length)
return length
def SpliteBlocks(totalsize, blocknumber):
blocksize = totalsize/blocknumber
ranges = []
for i in range(0, blocknumber-1):
ranges.append((i*blocksize, i*blocksize +blocksize - 1))
ranges.append(( blocksize*(blocknumber-1), totalsize -1 ))
return ranges
def islive(tasks):
for task in tasks:
if task.isAlive():
return True
return False
def paxel(url, samplevirus, output, blocks=6):
""" paxel
"""
size = GetUrlFileSize( url, samplevirus )
ran = SpliteBlocks( size, blocks )
threadname = [ "thread_%d" % i for i in range(0, blocks) ]
filename = [ path + os.sep + "tmpfile_%d" % i for i in range(0, blocks) ]
tasks = []
for i in range(0, blocks):
task = AxelPython(threadname[i], url, samplevirus, filename[i], ran[i])
task.setDaemon( True )
task.start()
tasks.append( task )
global finish,count
finish = True
count = 0
while finish :
for task in tasks :
if not task.isAlive():
task.run()
time.sleep(0.5)
if task.flag:
count+=1
tasks.remove(task)
# print count
if count == blocks:
finish = False
# print "has done"
time.sleep( 2 )
filehandle = open( output, "wb+" )
for i in filename:
f = open( i, "rb" )
filehandle.write( f.read() )
f.close()
try:
os.remove(i)
pass
except:
pass
filehandle.close()
def main():
paxel(网址 , 下载文件, 输出目录, blocks=6)
if __name__ == "__main__":
main()
实现了下载平均速度的展示
知道文件的url地址就用urllib模块的urlretrieve函数。
urllib.urlretrieve(url, filename)
filename是要保存到本地的文件名。函数后面还有2个可选参数,要用就看帮助文档吧。
多线下载的话,每一线程要指定下载服务器上文件的哪一块。http协议中head里可以指定Range。
下面用的是urllib2模块
request = urllib2.Request(url)
request.add_header("Range", "bytes=%d-%d"%(1024, 2048) #指定下载文件的范围
opener = urllib2.build_opener()
data = opener.open(request).read()
现在data里面就是文件的1024字节到2048字节的内容。
例子:
#coding=utf-8
import re
import os
import sys
import time
import glob
import string
import socket
import getopt
import urllib
import urllib2
import threading
from sgmllib import SGMLParser
#############################################################################
#
# self-defined exception classes
#
#############################################################################
class ConnectionError(Exception): pass
class URLUnreachable(Exception):pass
class CanotDownload(Exception):pass
#############################################################################
#
# multiple threads download module starts here
#
#############################################################################
class HttpGetThread(threading.Thread):
def __init__(self, name, url, filename, range=0):
threading.Thread.__init__(self, > self.url = url
self.filename = filename
self.range = range
self.totalLength = range[1] - range[0] +1
try:
self.downloaded = os.path.getsize(self.filename)
except OSError:
self.downloaded = 0
self.percent = self.downloaded/float(self.totalLength)*100
self.headerrange = (self.range[0]+self.downloaded, self.range[1])
self.bufferSize = 8192
def run(self):
try:
self.downloaded = os.path.getsize(self.filename)
except OSError:
self.downloaded = 0
self.percent = self.downloaded/float(self.totalLength)*100
#self.headerrange = (self.range[0]+self.downloaded, self.range[1])
self.bufferSize = 8192
#request = urllib2.Request(self.url)
#request.add_header("Range", "bytes=%d-%d" %self.headerrange)
downloadAll = False
retries = 1
while not downloadAll:
if retries > 10:
break
try:
self.headerrange = (self.range[0]+self.downloaded, self.range[1])
request = urllib2.Request(self.url)
request.add_header("Range", "bytes=%d-%d" %self.headerrange)
conn = urllib2.urlopen(request)
startTime = time.time()
data = conn.read(self.bufferSize)
while data:
f = open(self.filename, "ab")
f.write(data)
f.close()
self.time = int(time.time() - startTime)
self.downloaded += len(data)
self.percent = self.downloaded/float(self.totalLength) *100
data = conn.read(self.bufferSize)
downloadAll = True
except Exception, err:
retries += 1
time.sleep(1)
continue
def Split(size,blocks):
ranges = []
blocksize = size / blocks
for i in xrange(blocks-1):
ranges.append(( i*blocksize, i*blocksize+blocksize-1))
ranges.append(( blocksize*(blocks-1), size-1))
return ranges
def GetHttpFileSize(url):
length = 0
try:
conn = urllib.urlopen(url)
headers = conn.info().headers
for header in headers:
if header.find("Length") != -1:
length = header.split(":")[-1].strip()
length = int(length)
except Exception, err:
pass
return length
def hasLive(ts):
for t in ts:
if t.isAlive():
return True
return False
def MyHttpGet(url, output=None, connections=4):
"""
arguments:
url, in GBK encoding
output, default encoding, do no convertion
connections, integer
"""
length = GetHttpFileSize(url)
print length
mb = length/1024/1024.0
if length == 0:
raise URLUnreachable
blocks = connections
if output:
filename = output
else:
output = url.split("/")[-1]
ranges = Split(length, blocks)
names = ["%s_%d" %(output,i) for i in xrange(blocks)]
ts = []
for i in xrange(blocks):
t = HttpGetThread(i, url, names[i], ranges[i])
t.setDaemon(True)
t.start()
ts.append(t)
live = hasLive(ts)
startSize = sum([t.downloaded for t in ts])
startTime = time.time()
etime = 0
while live:
try:
etime = time.time() - startTime
d = sum([t.downloaded for t in ts])/float(length)*100
downloadedThistime = sum([t.downloaded for t in ts])-startSize
try:
rate = downloadedThistime / float(etime)/1024
except:
rate = 100.0
progressStr = u"rFilesize: %d(%.2fM) Downloaded: %.2f%% Avg rate: %.1fKB/s" %(length, mb, d, rate)
sys.stdout.write(progressStr)
sys.stdout.flush()
#sys.stdout.write("b"*(len(progressStr)+1))
live = hasLive(ts)
time.sleep(0.2)
except KeyboardInterrupt:
print "Exit..."
for n in names:
try:
os.remove(n)
except:
pass
sys.exit(1)
#print u"used time: %d:%d, pingjunsudu:%.2fKB/s" %(int(etime)/60, int(etime)%60,rate)
f = open(filename, "wb")
for n in names:
f.write(open(n,"rb").read())
try:
os.remove(n)
except:
pass
f.close()
if __name__ == "__main__":
MyHttpGet("http://hi.baidu.com/zjw0358","hi.html",4)