V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
3023369823
V2EX  ›  Python

dht 磁力源代码共享, python 语言(不了解请参看磁力百科)

  •  
  •   3023369823 · 2016-08-22 09:46:35 +08:00 · 4813 次点击
    这是一个创建于 3020 天前的主题,其中的信息可能已经有所发展或是发生改变。

    之前我在写百度网盘爬虫百度图片爬虫的时候答应网友说,抽时间要把ok 搜搜的的源码公开,如今是时候兑现诺言了,下面就是爬虫的所有代码,完全,彻底的公开,你会不会写程序都可以使用,不过请先装个 linux 系统,具备公网条件,然后运行:

    python startCrawler.py
    

    有必要提醒你,数据库字段代码中都有,请你自己建张表格,这个太简单了,就不多说了。同时我也提供一下下载地址,源码都在:下载地址 1 下载地址 2

    #!/usr/bin/env python
    # encoding: utf-8
    """
    author:haoning
    create time:2015.8.1
    """
    import hashlib
    import os
    import time
    import datetime
    import traceback
    import sys
    import random
    import json
    import socket
    import threading
    from hashlib import sha1 #进行 hash 加密
    from random import randint
    from struct import unpack
    from socket import inet_ntoa
    from threading import Timer, Thread
    from time import sleep
    from collections import deque
    from Queue import Queue
    
    import MySQLdb as mdb  #数据库连接器
    
    import metautils
    import downloadTorrent
    from bencode import bencode, bdecode
    import pygeoip
    
    DB_HOST = '127.0.0.1'
    DB_USER = 'root'
    DB_PASS = 'root'
    
    BOOTSTRAP_NODES = (
      ("67.215.246.10", 6881),
      ("82.221.103.244", 6881),
      ("23.21.224.150", 6881)
    )
    RATE = 1 #调控速率
    TID_LENGTH = 2
    RE_JOIN_DHT_INTERVAL = 3
    TOKEN_LENGTH = 2
    INFO_HASH_LEN = 500000 #50w 数据很小,限制内存不至于消耗太大
    CACHE_LEN = 100 #更新数据库缓存
    WAIT_DOWNLOAD = 80
    
    
    geoip = pygeoip.GeoIP('GeoIP.dat')
    
    def is_ip_allowed(ip):
      country = geoip.country_code_by_addr(ip)
      if country in ('CN','TW','JP','HK', 'KR'):
          return True
      return False
    
    def entropy(length):
      return "".join(chr(randint(0, 255)) for _ in xrange(length))
    
    def random_id():
      h = sha1()
      h.update(entropy(20))
      return h.digest()
    
    
    def decode_nodes(nodes):
      n = []
      length = len(nodes)
      if (length % 26) != 0:
          return n
    
      for i in range(0, length, 26):
          nid = nodes[i:i+20]
          ip = inet_ntoa(nodes[i+20:i+24])
          port = unpack("!H", nodes[i+24:i+26])[0]
          n.append((nid, ip, port))
    
      return n
    
    
    def timer(t, f):
      Timer(t, f).start()
    
    
    def get_neighbor(target, nid, end=10):
      return target[:end]+nid[end:]
    
    
    class KNode(object):
    
      def __init__(self, nid, ip, port):
          self.nid = nid
          self.ip = ip
          self.port = port
    
    
    class DHTClient(Thread):
    
      def __init__(self, max_node_qsize):
          Thread.__init__(self)
          self.setDaemon(True)
          self.max_node_qsize = max_node_qsize
          self.nid = random_id()
          self.nodes = deque(maxlen=max_node_qsize)
    
      def send_krpc(self, msg, address):
          try:
              self.ufd.sendto(bencode(msg), address)
          except Exception:
              pass
    
      def send_find_node(self, address, nid=None):
          nid = get_neighbor(nid, self.nid) if nid else self.nid
          tid = entropy(TID_LENGTH)
          msg = {
              "t": tid,
              "y": "q",
              "q": "find_node",
              "a": {
                  "id": nid,
                  "target": random_id()
              }
          }
          self.send_krpc(msg, address)
    
      def join_DHT(self):
          for address in BOOTSTRAP_NODES:
              self.send_find_node(address)
    
      def re_join_DHT(self):
          if len(self.nodes) == 0:
              self.join_DHT()
          timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)
    
      def auto_send_find_node(self):
          wait = 1.0 / self.max_node_qsize
          while True:
              try:
                  node = self.nodes.popleft()
                  self.send_find_node((node.ip, node.port), node.nid)
              except IndexError:
                  pass
              try:
                  sleep(wait)
              except KeyboardInterrupt:
                  os._exit(0)
    
      def process_find_node_response(self, msg, address):
          nodes = decode_nodes(msg["r"]["nodes"])
          for node in nodes:
              (nid, ip, port) = node
              if len(nid) != 20: continue
              if ip == self.bind_ip: continue
              n = KNode(nid, ip, port)
              self.nodes.append(n)
    
    
    class DHTServer(DHTClient): #获得 info_hash
    
      def __init__(self, master, bind_ip, bind_port, max_node_qsize):
          DHTClient.__init__(self, max_node_qsize)
    
          self.master = master
          self.bind_ip = bind_ip
          self.bind_port = bind_port
          self.speed=0
    
          self.process_request_actions = {
              "get_peers": self.on_get_peers_request,
              "announce_peer": self.on_announce_peer_request,
          }
    
          self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
          self.ufd.bind((self.bind_ip, self.bind_port))
    
          timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)
    
    
      def run(self):
          self.re_join_DHT()
          while True:
              try:
                  (data, address) = self.ufd.recvfrom(65536)
                  msg = bdecode(data)
                  self.on_message(msg, address)
              except Exception:
                  pass
    
      def on_message(self, msg, address):
          global RATE #设为全局量
          try:
              if msg["y"] == "r":
                  if msg["r"].has_key("nodes"):
                      self.process_find_node_response(msg, address) #发现节点
              elif msg["y"] == "q":
                  try:
                      self.speed+=1
                      if self.speed % 10000 ==0:
                          RATE=random.randint(1,3)
                          if RATE==2:
                              RATE=1
                          if RATE==3:
                              RATE=10
                          if self.speed>100000:
                              self.speed=0
                      if self.speed % RATE==0: #数据过多,占用 cpu 太多,划分限速,1,1,10
                          self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取 info_hash
                      #self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取 info_hash
                  except KeyError:
                      self.play_dead(msg, address)
          except KeyError:
              pass
    
      def on_get_peers_request(self, msg, address):
          try:
              infohash = msg["a"]["info_hash"]
              tid = msg["t"]
              nid = msg["a"]["id"]
              token = infohash[:TOKEN_LENGTH]
              msg = {
                  "t": tid,
                  "y": "r",
                  "r": {
                      "id": get_neighbor(infohash, self.nid),
                      "nodes": "",
                      "token": token
                  }
              }
              self.master.log(infohash, address)
              self.send_krpc(msg, address)
          except KeyError:
              pass
    
      def on_announce_peer_request(self, msg, address):
          try:
              infohash = msg["a"]["info_hash"]
              token = msg["a"]["token"]
              nid = msg["a"]["id"]
              tid = msg["t"]
    
              if infohash[:TOKEN_LENGTH] == token:
                  if msg["a"].has_key("implied_port ") and msg["a"]["implied_port "] != 0:
                      port = address[1]
                  else:
                      port = msg["a"]["port"]
                  self.master.log_announce(infohash, (address[0], port))
          except Exception:
              print 'error'
              pass
          finally:
              self.ok(msg, address)
    
      def play_dead(self, msg, address):
          try:
              tid = msg["t"]
              msg = {
                  "t": tid,
                  "y": "e",
                  "e": [202, "Server Error"]
              }
              self.send_krpc(msg, address)
          except KeyError:
              pass
    
      def ok(self, msg, address):
          try:
              tid = msg["t"]
              nid = msg["a"]["id"]
              msg = {
                  "t": tid,
                  "y": "r",
                  "r": {
                      "id": get_neighbor(nid, self.nid)
                  }
              }
              self.send_krpc(msg, address)
          except KeyError:
              pass
    
    
    class Master(Thread): #解析 info_hash
    
      def __init__(self):
          Thread.__init__(self)
          self.setDaemon(True)
          self.queue = Queue()
          self.cache = Queue()
          self.count=0
          self.mutex = threading.RLock() #可重入锁,使单线程可以再次获得已经获得的?
          self.waitDownload = Queue()
          self.metadata_queue = Queue()
          self.dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, 'oksousou', charset='utf8')
          self.dbconn.autocommit(False)
          self.dbcurr = self.dbconn.cursor()
          self.dbcurr.execute('SET NAMES utf8')
          self.visited = set()
                   
      def lock(self): #加锁
          self.mutex.acquire()
    
      def unlock(self): #解锁
          self.mutex.release()
           
      def work(self,item):
    
          print "start thread",item
          while True:
              self.prepare_download_metadata()
              self.lock()
              self.download_metadata()
              self.unlock()
    
              self.lock()
              self.got_torrent()
              self.unlock()
                       
      def start_work(self,max):
       
          for item in xrange(max):
              t = threading.Thread(target=self.work, args=(item,))
              t.setDaemon(True)
              t.start()
           
      #入队的种子效率更高
      def log_announce(self, binhash, address=None):
          if self.queue.qsize() < INFO_HASH_LEN : #大于 INFO_HASH_LEN 就不要入队,否则后面来不及处理
              if is_ip_allowed(address[0]):
                  self.queue.put([address, binhash]) #获得 info_hash
           
      def log(self, infohash, address=None):
          if self.queue.qsize() < INFO_HASH_LEN: #大于 INFO_HASH_LEN/2 就不要入队,否则后面来不及处理
              if is_ip_allowed(address[0]):
                  self.queue.put([address, infohash])
       
      def prepare_download_metadata(self):
           
          if self.queue.qsize() == 0:
              sleep(2)
          #从 queue 中获得 info_hash 用来下载
          address, binhash= self.queue.get() 
          if binhash in self.visited:
              return
          if len(self.visited) > 100000: #大于 100000 重置队列,认为已经访问过了
              self.visited = set()
          self.visited.add(binhash)
          #跟新已经访问过的 info_hash
          info_hash = binhash.encode('hex')
          utcnow = datetime.datetime.utcnow()
           
          self.cache.put((address,binhash,utcnow)) #装入缓存队列
       
      def download_metadata(self):
       
          if self.cache.qsize() > CACHE_LEN/2: #出队更新下载
              while self.cache.qsize() > 0: #排空队列
                  address,binhash,utcnow = self.cache.get()
                  info_hash = binhash.encode('hex')
                  self.dbcurr.execute('SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,))
                  y = self.dbcurr.fetchone()
                  if y:
                  # 更新最近发现时间,请求数
                      self.dbcurr.execute('UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s', (utcnow, info_hash))
                  else: 
                      self.waitDownload.put((address, binhash))
              self.dbconn.commit()
              if self.waitDownload.qsize() > WAIT_DOWNLOAD:
                  while self.waitDownload.qsize() > 0:
                      address,binhash = self.waitDownload.get()
                      t = threading.Thread(target=downloadTorrent.download_metadata, args=(address, binhash, self.metadata_queue))
                      t.setDaemon(True)
                      t.start()
    
      def decode(self, s):
          if type(s) is list:
              s = ';'.join(s)
          u = s
          for x in (self.encoding, 'utf8', 'gbk', 'big5'):
              try:
                  u = s.decode(x)
                  return u
              except:
                  pass
          return s.decode(self.encoding, 'ignore')
    
      def decode_utf8(self, d, i):
          if i+'.utf-8' in d:
              return d[i+'.utf-8'].decode('utf8')
          return self.decode(d[i])
       
      def parse_metadata(self, data): #解析种子
          info = {}
          self.encoding = 'utf8'
          try:
              torrent = bdecode(data) #编码后解析
              if not torrent.get('name'):
                  return None
          except:
              return None
          detail = torrent
          info['name'] = self.decode_utf8(detail, 'name')
          if 'files' in detail:
              info['files'] = []
              for x in detail['files']:
                  if 'path.utf-8' in x:
                      v = {'path': self.decode('/'.join(x['path.utf-8'])), 'length': x['length']}
                  else:
                      v = {'path': self.decode('/'.join(x['path'])), 'length': x['length']}
                  if 'filehash' in x:
                      v['filehash'] = x['filehash'].encode('hex')
                  info['files'].append(v)
              info['length'] = sum([x['length'] for x in info['files']])
          else:
              info['length'] = detail['length']
          info['data_hash'] = hashlib.md5(detail['pieces']).hexdigest()
          return info
    
      def got_torrent(self):
          if self.metadata_queue.qsize() == 0:
              return
          binhash, address, data,start_time = self.metadata_queue.get()
          if not data:
              return
          try:
              info = self.parse_metadata(data)
              if not info:
                  return
          except:
              traceback.print_exc()
              return
    
          temp = time.time()
          x = time.localtime(float(temp))
          utcnow = time.strftime("%Y-%m-%d %H:%M:%S",x) # get time now
           
          info_hash = binhash.encode('hex') #磁力
          info['info_hash'] = info_hash
          # need to build tags
          info['tagged'] = False
          info['classified'] = False
          info['requests'] = 1
          info['last_seen'] = utcnow
          info['create_time'] = utcnow
          info['source_ip'] = address[0]
           
          if info.get('files'):
              files = [z for z in info['files'] if not z['path'].startswith('_')]
              if not files:
                  files = info['files']
          else:
              files = [{'path': info['name'], 'length': info['length']}]
          files.sort(key=lambda z:z['length'], reverse=True)
          bigfname = files[0]['path']
          info['extension'] = metautils.get_extension(bigfname).lower()
          info['category'] = metautils.get_category(info['extension'])
    
          try:
              try:
                  print '\n', 'Saved', info['info_hash'], info['name'], (time.time()-start_time), 's', address[0]
              except:
                  print '\n', 'Saved', info['info_hash']
              ret = self.dbcurr.execute('INSERT INTO search_hash(info_hash,category,data_hash,name,extension,classified,source_ip,tagged,' + 
                  'length,create_time,last_seen,requests) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',
                  (info['info_hash'], info['category'], info['data_hash'], info['name'], info['extension'], info['classified'],
                  info['source_ip'], info['tagged'], info['length'], info['create_time'], info['last_seen'], info['requests']))
              if self.count %50 ==0:
                  self.dbconn.commit()
                  if self.count>100000:
                      self.count=0
          except:
              print self.name, 'save error', self.name, info
              traceback.print_exc()
              return
    
    if __name__ == "__main__":
       
      #启动客户端
      master = Master()
      master.start_work(150)
       
      #启动服务器
      dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=200)
      dht.start()
      dht.auto_send_find_node()
    

    注:不好意思, V2EX 主题字数不能超过 20000 ,我没法写下去了,此贴首发博客园,我把链接贴出来,可以过去看看,不想过去把源码下下来慢慢看吧,开篇我已经给了地址,谢谢。 博客园地址:http://www.cnblogs.com/huangxie/p/5550680.html 这个爬虫还是耗费了本人和其他网上高手的很多时间的,请看到这篇博客的朋友保持钻研精神,开源精神,多多交流,秉承分享。本人建立个 qq 群作为去转盘网的官方群,人数现在也不多,如果有兴趣的话来逛逛吧,多个粉丝去转盘多一份热闹, qq 群号: 512245829

    6 条回复    2016-08-22 21:41:14 +08:00
    snsd
        1
    snsd  
       2016-08-22 10:24:12 +08:00 via iPhone
    楼主方便的话能不能做个爱奇艺 1080p 视频的在线解析?
    3023369823
        2
    3023369823  
    OP
       2016-08-22 10:44:02 +08:00
    @snsd 这个没有研究过,你有好的思路可以说下
    Allianzcortex
        3
    Allianzcortex  
       2016-08-22 11:18:01 +08:00
    我最近也在看 DHT 协议,准备实时监听。不过 LZ 的代码里可以优化一下 global 全局变量和裸 except 这块( no offense 哈)。
    3023369823
        4
    3023369823  
    OP
       2016-08-22 11:21:40 +08:00
    @Allianzcortex 是的,代码写了好久了,有时间在看看,谢谢你的指出
    hmlbr
        5
    hmlbr  
       2016-08-22 17:34:07 +08:00
    有空研究下,赞 lz
    3023369823
        6
    3023369823  
    OP
       2016-08-22 21:41:14 +08:00
    @hmlbr 好的
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1079 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 19:51 · PVG 03:51 · LAX 11:51 · JFK 14:51
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.