【python 多线程】Python实现多线程下载文件方法总结

更新时间:2019-10-03    来源:python    手机版     字体:

【www.bbyears.com--python】

今天把刚学python时收藏的几篇多线程下载文件的网页看了一下。

实现简单的多线程下载,需要关注如下几点:
1.文件的大小:可以从reponse header中提取,如“Content-Length:911”表示大小是911字节
2.任务拆分:指定各个线程下载的文件的哪一块,可以通过request header中添加“Range: bytes=300-400”(表示下载300~400byte的内容),注意可以请求的文件的range是[0, size-1]字节的。
3.下载文件的聚合:各个线程将自己下载的文件块保存为临时文件,所有线程都完成后,再将这些临时文件按顺序聚合写入到最终的一个文件中。

网上看到一个不错的多线程下载文件的例子,是根据Linux上多线程下载工具axel的思想来写的,源代码在:http://fayaa.com/code/view/58/full/
我将其收录起来,后续需要时可以基于它来修改,示例代码如下:
https://github.com/smilejay/python/blob/master/py2014/paxel.py

 代码如下 #!/usr/bin/python
# -*- coding: utf-8 -*-
# filename: paxel.py
# FROM: http://fayaa.com/code/view/58/full/
# Jay modified it a little and save for further potential usage.
 
"""It is a multi-thread downloading tool
 
    It was developed following axel.
        Author: volans
        E-mail: volansw [at] gmail.com
"""
 
import sys
import os
import time
import urllib
from threading import Thread
 
# in case you want to use http_proxy
local_proxies = {"http": "http://131.139.58.200:8080"}
 
 
class AxelPython(Thread, urllib.FancyURLopener):
    """Multi-thread downloading class.
 
        run() is a vitural method of Thread.
    """
    def __init__(self, threadname, url, filename, ranges=0, proxies={}):
        Thread.__init__(self, name=threadname)
        urllib.FancyURLopener.__init__(self, proxies)
        self.name = threadname
        self.url = url
        self.filename = filename
        self.ranges = ranges
        self.downloaded = 0
 
    def run(self):
        """vertual function in Thread"""
        try:
            self.downloaded = os.path.getsize(self.filename)
        except OSError:
            #print "never downloaded"
            self.downloaded = 0
 
        # rebuild start poind
        self.startpoint = self.ranges[0] + self.downloaded
 
        # This part is completed
        if self.startpoint >= self.ranges[1]:
            print "Part %s has been downloaded over." % self.filename
            return
 
        self.oneTimeSize = 16384  # 16kByte/time
        print "task %s will download from %d to %d" % (self.name, self.startpoint, self.ranges[1])
 
        self.addheader("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1]))
        self.urlhandle = self.open(self.url)
 
        data = self.urlhandle.read(self.oneTimeSize)
        while data:
            filehandle = open(self.filename, "ab+")
            filehandle.write(data)
            filehandle.close()
 
            self.downloaded += len(data)
            #print "%s" % (self.name)
            #progress = u"r..."
 
            data = self.urlhandle.read(self.oneTimeSize)
 
 
def GetUrlFileSize(url, proxies={}):
    urlHandler = urllib.urlopen(url, proxies=proxies)
    headers = urlHandler.info().headers
    length = 0
    for header in headers:
        if header.find("Length") != -1:
            length = header.split(":")[-1].strip()
            length = int(length)
    return length
 
 
def SpliteBlocks(totalsize, blocknumber):
    blocksize = totalsize / blocknumber
    ranges = []
    for i in range(0, blocknumber - 1):
        ranges.append((i * blocksize, i * blocksize + blocksize - 1))
    ranges.append((blocksize * (blocknumber - 1), totalsize - 1))
 
    return ranges
 
 
def islive(tasks):
    for task in tasks:
        if task.isAlive():
            return True
    return False
 
 
def paxel(url, output, blocks=6, proxies=local_proxies):
    """ paxel
    """
    size = GetUrlFileSize(url, proxies)
    ranges = SpliteBlocks(size, blocks)
 
    threadname = ["thread_%d" % i for i in range(0, blocks)]
    filename = ["tmpfile_%d" % i for i in range(0, blocks)]
 
    tasks = []
    for i in range(0, blocks):
        task = AxelPython(threadname[i], url, filename[i], ranges[i])
        task.setDaemon(True)
        task.start()
        tasks.append(task)
 
    time.sleep(2)
    while islive(tasks):
        downloaded = sum([task.downloaded for task in tasks])
        process = downloaded / float(size) * 100
        show = u"rFilesize:%d Downloaded:%d Completed:%.2f%%" % (size, downloaded, process)
        sys.stdout.write(show)
        sys.stdout.flush()
        time.sleep(0.5)
 
    filehandle = open(output, "wb+")
    for i in filename:
        f = open(i, "rb")
        filehandle.write(f.read())
        f.close()
        try:
            os.remove(i)
            pass
        except:
            pass
 
    filehandle.close()
 
if __name__ == "__main__":
    url = "http://dldir1.qq.com/qqfile/QQforMac/QQ_V3.1.1.dmg"
    output = "download.file"
    paxel(url, output, blocks=4, proxies={})




添加了https用户名密码验证:

由于需要下载https带ssl验证的文件,而且经常会出现单进程卡死的情况就考虑用了多线程下载

 代码如下 # -*- coding:utf8 -*-
import os
import getpass,urllib2,sys
import cookielib
import threading,thread
url = 你的https://url
username = 你的用户名
password = 你的密码

#class which supply request authcation info
class TerminalPassword(urllib2.HTTPPasswordMgr):
    def find_user_password(self, realm, authuri):
        retval = urllib2.HTTPPasswordMgr.find_user_password(self, realm,
                                                            authuri)
        if retval[0] == None and retval[1] == None:
            user = username
            passwd = password
            return (user, passwd)
        else:
            return retval       
"""It is a multi-thread downloading tool
    It was developed follow axel.
        Author: volans
        E-mail: volansw [at] gmail.com
    modify:gavin ma
    date:2011-04-12
"""
def Init():
    try:
        cj = cookielib.CookieJar()
        opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj),urllib2.HTTPBasicAuthHandler(TerminalPassword()))
        urllib2.install_opener(opener)
        urllib2.urlopen(url)
        return True
    except Exception,e:
         pass
class AxelPython(threading.Thread):
    """Multi-thread downloading class.
        run() is a vitural method of Thread.
    """
    def __init__(self, threadname, url, virus, filename, ranges=0):
        threading.Thread.__init__(self, name=threadname) 
        self.name = threadname
        self.url = url
        self.virus = virus
        self.filename = filename
        self.ranges = ranges
        self.downloaded = 0
        self.flag = False
    def run(self):
        """vertual function in Thread"""
        try:
            self.downloaded = os.path.getsize( self.filename )
        except OSError:
            self.downloaded = 0
        # rebuild start poind
        self.startpoint = self.ranges[0] + self.downloaded
       
        # This part is completed
        if self.startpoint >= self.ranges[1]:
            print "Part %s has been downloaded over." % self.filename
            return       
        self.oneTimeSize = 8000 #16kByte/time
        try :
            Init()
            req = urllib2.Request(self.url+self.virus)
            req.add_header("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1]))
            urlhandle = urllib2.urlopen(req)
            data = urlhandle.read(self.oneTimeSize)
            while data:
                filehandle = open( self.filename, "ab+" )
                filehandle.write( data )
                filehandle.close()
                self.downloaded += len( data )
                data = urlhandle.read( self.oneTimeSize )
            self.flag = True
        except :
            self.flag = False
            pass

           
def GetUrlFileSize(url, samplevirus):
    if  Init():
        req = urllib2.Request(url+samplevirus)
        urlHandler = urllib2.urlopen(req) 
        headers = urlHandler.info().headers
        length = 0
        for header in headers:
                           
            if header.find("Content-Length") != -1:
                length = header.split(":")[-1].strip()
                length = int(length)
        return length
def SpliteBlocks(totalsize, blocknumber):
    blocksize = totalsize/blocknumber
    ranges = []
    for i in range(0, blocknumber-1):
        ranges.append((i*blocksize, i*blocksize +blocksize - 1))
    ranges.append(( blocksize*(blocknumber-1), totalsize -1 ))
    return ranges
def islive(tasks):
    for task in tasks:
        if task.isAlive():
            return True
    return False
def paxel(url, samplevirus, output, blocks=6):
    """ paxel
    """
    size = GetUrlFileSize( url, samplevirus )
    ran = SpliteBlocks( size, blocks )
    threadname = [ "thread_%d" % i for i in range(0, blocks) ]
    filename = [ path + os.sep + "tmpfile_%d" % i for i in range(0, blocks) ]
    tasks = []
    for i in range(0, blocks):
        task = AxelPython(threadname[i], url, samplevirus, filename[i], ran[i])
        task.setDaemon( True )
        task.start()
        tasks.append( task )
    global finish,count
    finish = True
    count = 0
    while finish :
         for task in tasks :
             if not task.isAlive():
                task.run()
                time.sleep(0.5)
             if task.flag:
                count+=1
                tasks.remove(task)
               # print count
             if count == blocks:
                finish = False
               # print "has done"
           
    time.sleep( 2 )           
    filehandle = open( output, "wb+" )
    for i in filename:
        f = open( i, "rb" )
        filehandle.write( f.read() )
        f.close()
        try:
            os.remove(i)
            pass
        except:
            pass
    filehandle.close()

                                       
def main():
    paxel(网址 , 下载文件, 输出目录, blocks=6)
    
if  __name__ == "__main__":
    main()




实现了下载平均速度的展示


知道文件的url地址就用urllib模块的urlretrieve函数。
urllib.urlretrieve(url, filename)
filename是要保存到本地的文件名。函数后面还有2个可选参数,要用就看帮助文档吧。

多线下载的话,每一线程要指定下载服务器上文件的哪一块。http协议中head里可以指定Range。
下面用的是urllib2模块

request = urllib2.Request(url)
request.add_header("Range", "bytes=%d-%d"%(1024, 2048) #指定下载文件的范围

opener = urllib2.build_opener()
data = opener.open(request).read()

现在data里面就是文件的1024字节到2048字节的内容。

例子:

 代码如下 #!/usr/bin/env python
#coding=utf-8
import re
import os
import sys
import time
import glob
import string
import socket
import getopt
import urllib
import urllib2
import threading
from sgmllib import SGMLParser

#############################################################################
#
# self-defined exception classes
#
#############################################################################
class ConnectionError(Exception): pass
class URLUnreachable(Exception):pass
class CanotDownload(Exception):pass

#############################################################################
#
# multiple threads download module starts here
#
#############################################################################
class HttpGetThread(threading.Thread):
    def __init__(self, name, url, filename, range=0):
        threading.Thread.__init__(self, >        self.url = url
        self.filename = filename
        self.range = range
        self.totalLength = range[1] - range[0] +1
        try:
            self.downloaded = os.path.getsize(self.filename)
        except OSError:
            self.downloaded = 0
        self.percent = self.downloaded/float(self.totalLength)*100
        self.headerrange = (self.range[0]+self.downloaded, self.range[1])
        self.bufferSize = 8192


    def run(self):
        try:
            self.downloaded = os.path.getsize(self.filename)
        except OSError:
            self.downloaded = 0
        self.percent = self.downloaded/float(self.totalLength)*100
        #self.headerrange = (self.range[0]+self.downloaded, self.range[1])
        self.bufferSize = 8192
        #request = urllib2.Request(self.url)
        #request.add_header("Range", "bytes=%d-%d" %self.headerrange)
        downloadAll = False
        retries = 1
        while not downloadAll:
            if retries > 10:
                break
            try:
                self.headerrange = (self.range[0]+self.downloaded, self.range[1])
                request = urllib2.Request(self.url)
                request.add_header("Range", "bytes=%d-%d" %self.headerrange)
                conn = urllib2.urlopen(request)
                startTime = time.time()
                data = conn.read(self.bufferSize)
                while data:
                    f = open(self.filename, "ab")
                    f.write(data)
                    f.close()
                    self.time = int(time.time() - startTime)
                    self.downloaded += len(data)
                    self.percent = self.downloaded/float(self.totalLength) *100            
                    data = conn.read(self.bufferSize)
                downloadAll = True
            except Exception, err:
                retries += 1
                time.sleep(1)
                continue

def Split(size,blocks):
    ranges = []
    blocksize = size / blocks
    for i in xrange(blocks-1):
        ranges.append(( i*blocksize, i*blocksize+blocksize-1))
    ranges.append(( blocksize*(blocks-1), size-1))

    return ranges

def GetHttpFileSize(url):
    length = 0
    try:
        conn = urllib.urlopen(url)
        headers = conn.info().headers
        for header in headers:
            if header.find("Length") != -1:
                length = header.split(":")[-1].strip()
                length = int(length)
    except Exception, err:
        pass
     
    return length

def hasLive(ts):
    for t in ts:
        if t.isAlive():
            return True
    return False

def MyHttpGet(url, output=None, connections=4):
    """
    arguments:
        url, in GBK encoding
        output, default encoding, do no convertion
        connections, integer
    """
    length = GetHttpFileSize(url)
    print length
    mb = length/1024/1024.0
    if length == 0:
        raise URLUnreachable
    blocks = connections
    if output:
        filename = output
    else:
        output = url.split("/")[-1]
    ranges = Split(length, blocks)
    names = ["%s_%d" %(output,i) for i in xrange(blocks)]
 
    ts = []
    for i in xrange(blocks):
        t = HttpGetThread(i, url, names[i], ranges[i])
        t.setDaemon(True)
        t.start()
        ts.append(t)

    live = hasLive(ts)
    startSize = sum([t.downloaded for t in ts])
    startTime = time.time()
    etime = 0
    while live:
        try:
            etime = time.time() - startTime
            d = sum([t.downloaded for t in ts])/float(length)*100
            downloadedThistime = sum([t.downloaded for t in ts])-startSize
            try:
                rate = downloadedThistime / float(etime)/1024
            except:
                rate = 100.0
            progressStr = u"rFilesize: %d(%.2fM) Downloaded: %.2f%% Avg rate: %.1fKB/s" %(length, mb, d, rate)
            sys.stdout.write(progressStr)
            sys.stdout.flush()
            #sys.stdout.write("b"*(len(progressStr)+1))
            live = hasLive(ts)
            time.sleep(0.2)
        except KeyboardInterrupt:
            print
            print "Exit..."
            for n in names:
                try:
                    os.remove(n)
                except:
                    pass
            sys.exit(1)
         
    print
    #print u"used time: %d:%d, pingjunsudu:%.2fKB/s" %(int(etime)/60, int(etime)%60,rate)

    f = open(filename, "wb")
    for n in names:
        f.write(open(n,"rb").read())
        try:
            os.remove(n)
        except:
            pass
    f.close()


if __name__ == "__main__":
    MyHttpGet("http://hi.baidu.com/zjw0358","hi.html",4)

本文来源:http://www.bbyears.com/jiaocheng/70853.html

热门标签

更多>>

本类排行