精华内容
下载资源
问答
  • 斗鱼弹幕抓取及实时弹幕数据可视化,分为履带(弹幕抓取),服务器(弹幕统计数据服务器),web(统计数据可视化前端)三部分。 正在开发中…… 运行 安装依赖 # dybarrage-crawler pip install -r requirements.txt...
  • 豆雨弹幕专业版 ...抓取弹幕记录查询,下载指定抓取记录所抓取弹幕,下载全部弹幕 自定义关键字统计 铁粉(发送弹幕最多)统计 高光时刻实时自动捕获,记录,查询,下载高光时刻弹幕 弹幕发送速度实
  • 爬虫这东西很实用,意义不下于你学会做PPT和Excel。...抓取链接:忘了,B站随便点开的一个视频;主要思路:1. 这个是做到现在我觉得挺好玩的一个抓取,不是做的爬虫,就直接在chrome查找到xml文件,点击打开位New Ta...

    爬虫这东西很实用,意义不下于你学会做PPT和Excel。真正掌握方法论很难,需要时间和不断的实践。但掌握一门小工具,投入和产出比在我看来是比较惊人的。

    爬虫其实没有这么难。最近在三节课上上了一堂陈大欣老师的课,随手做做课程笔记+作业。

    抓取链接:忘了,B站随便点开的一个视频;

    主要思路:

    1. 这个是做到现在我觉得挺好玩的一个抓取,不是做的爬虫,就直接在chrome查找到xml文件,点击打开位New Table;

    2. 打开Excel-数据选项卡-自网站,粘贴到这个tab的链接,然后就能做了;

    3. 关于时间,需要换算一下,公式在表里面;

    4. 另外,这张表里有两个时间,一个是发布时间,即2016年6月28日(在下面不知道怎么公式没显示出来,但是下载后的excel发布时间这一栏显示的就是正常年月日+时间);另一个是时长,即弹幕在这个视频中是什么时候发的。

    5. 这两个都涉及到excel数字格式的使用,例如时长是 1313s,如何转换成X分X秒呢?可以用到两个公式。

    =INT(H16/60)&"分"&MOD(H16,60)&”秒”,Int函数取整,Mod函数求余,&连接;

    =TEXT(H16/(60*60*24),"m's”)

    6. 数据透视表,汇总,计数,可以看到多少分多少秒,有多少弹幕;这个东西是最好玩的,如果没有耐心看完整个视频,只需要在挑个弹幕数量最多的时间点High一下就行了。

    beefbc4e1a1f?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation

    beefbc4e1a1f?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation

    beefbc4e1a1f?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation

    展开全文
  • asyncore斗鱼弹幕抓取

    2020-01-09 00:20:32
    斗鱼弹幕抓取 斗鱼api网上开放的。 数据发送和接收流程: 先发送长度,在发送数据,接收数据就是先接收长度,后接收数据。

    斗鱼弹幕抓取

    斗鱼api网上开放的。
    在这里插入图片描述

    数据发送和接收流程:

    先发送数据长度,在发送数据,接收数据就是先接收长度,后接收数据。
    在这里插入图片描述

    对数据包封装:
    对数据包进行对象化封装,对数据的封装方便以后使用,实现对象和二进制数据之间的转换。

    • 通过参数构建数据包对象
    class DataPacket():
        """封装数据包"""
        def __init__(self,type=DATA_PACKET_TYPE_SEND,content=""): #默认是发送数据包,正文内容content
            #数据包的类型
            self.type=type
            #数据部分内容
            self.content=content
            #加密标记
            self.encrypt_flag=0
            #保留字段
            self.preserve_flag=0
    
    • 获取数据包长度的方法
      看上面图,4个字节表示数据的消息长度,2个字节表示消息类型,1个字节加密字段,1个字节保留字段,再加上数据部分的长度len(self.content.encode(“utf-8”)),
      还要加上’\0’,
      在这里插入图片描述
      总长度:4+2+1+1+len(self.content.encode(“utf-8”))+1。
     def get_length(self):
            """获取数据包的长度,为以后发送数据包做准备"""
            
    
            return 4+2+1+1+len(self.content.encode("utf-8"))+1
    
    • 实现获取二进制数据的方法
        def get_bytes(self):
            """通过数据包转换成二进制数据"""
            data=bytes()
            #构建四个字节的消息长度
            data_packet_length=self.get_length()
            #to_bytes 把一个整形数据转换成二进制数据
            # 第一个参数 表示需要转换的二进制数据占几个字节
            #byteorder 第二个参数描述字节序
            #signed 设置是否有符号
            
            #处理消息长度
            data+=data_packet_length.to_bytes(4,byteorder="little",signed=False)
            #处理消息类型
            data+=self.type.to_bytes(2,byteorder="little",signed=False)
            #处理加密字段
            data+=self.encrypt_flag.to_bytes(1,byteorder="little",signed=False)
            #处理保留字段
            data+=self.preserve_flag.to_bytes(1,byteorder="little",signed=False)
               #处理保留字段
            data+=self.preserve_flag.to_bytes(1,byteorder="little",signed=False)
    
            #处理数据内容
            data+=self.content.encode("utf-8")
    
            #添加\0数据
            data+=b'\0'
    
    

    实现发送数据包:
    在这里插入图片描述
    1.构建发送数据包队列容器

    #构建发送数据包的队列容器
            self.send_queue=Queue()
    

    2.实现回调函数,判断容器中有数据就发送没数据就不发送

        def  writable(self):
            return self.send_queue.qsize()>0
    
        def handle_write(self):
            #从发送数据包队列中获取数据包对象
            dp=self.send_queue.get()
            #获取数据包长度,并且发送给服务器
            dp_length=dp.get_length()
            dp_length_data=dp_length.to_bytes(4,byteorder="little",signed=False)
            self.send()
            #发送数据包二进制数据
            self.send(dp.get_bytes())
            #数据发送完成
            self.send_queue.task_done()
    

    3.实现登录函数
    在这里插入图片描述

        def login_room_id(self,room_id):
            """登录服务器"""
            #构建登录数据包
    
            content=" type @= loginreq/roomid @={}/".format(room_id)
            login_dp=DataPacket(DATA_PACKET_TYPE_SEND,content=content)
    
            #把数据包添加到发送数据包容器中
            self.send_queue.put(login_dp)
    

    实现接收数据包:
    在这里插入图片描述
    1.构建数据包队列

     #存放接收数据包对象
            self.recv_queue=Queue()
            
    

    2.读取回调函数中的读取数据

        def handle_read(self):
            # 读取长度,二进制数据
            data_length_data = self.recv(4)
            # 通过二进制获取 length具体数据
            data_length = int.from_bytes(data_length_data, byteorder='little', signed=False)
            # 通过数据包的长度获取数据
            data = self.recv(data_length)
            # 通过二进制数据构建数据包对象
            dp = DataPacket(data_bytes=data)
            # 把数据包放入接收数据包容器中
            self.recv_queue.put(dp)
    

    3.构建处理数据包的线程

     #构建一个专门处理接收数据包容器中的数据包线程
            self.callback_thread=threading.Thread(target=self.do_callback)
            self.callback_thread.setDaemon(True)
            self.callback_thread.start()
    

    然后封装do_callback函数。

        def  do_callback(self):
            """专门负责处理接收数据包容器中的数据"""
            while True:
                #从接收数据包容器中获取数据包
                dp=self.recv_queue.get()
                #对数据进行处理
                print(dp.content)
            pass
    

    实现外部传入回调函数
    通过外部指定回调函数实现自定义数据处理
    ·添加指定函数callback
    构造函数中添加参数

    def __init__(self, host, port,callback=None):
    #定义外部传入的自定义回调函数
            
            self.callback=callback
    

    外部传入自定义回调函数

    def data_callback(dp):
        """
        自定义回调函数
        :param dp: 数据包对象
        :return:
        """
        print("data_callback:",dp.content)
        pass
        
     if __name__ == '__main__':
    		 client = DouyuClient('openbarrage.douyutv.com', 8601,callback=data_callback)
    		 client.login_room_id(4494106)
    		 asyncore.loop(timeout=20)
    

    ·在处理接收数据包的线程中调用回调函数

        def  do_callback(self):
            """专门负责处理接收数据包容器中的数据"""
            while True:
                #从接收数据包容器中获取数据包
                dp=self.recv_queue.get()
                #对数据进行处理
                if  self.callback is not None:
                    self.callback(dp)
                self.recv_queue.task_done()
            pass
    

    数据内容序列化与反序列化
    1.键key和 值value 直接采用’@=‘分割
    2.数组采用 ‘/’ 分割
    3.如果key或者value总含有字符’/’,则使用’@S’转义
    4.如果key或者value总含有字符’@’,则使用’@A’转义

    def  encode_content(content):
        """
        序列化函数
        :param content:  需要序列化的内容
        :return:
        """
        if isinstance(content,str):
            return content.replace(r'@',r'@A').replace(r'/',r'@S')
        elif isinstance(content,dict):
            return r'/'.join(["{}@={}".format(encode_content(k),encode_content(v)) for k,v in content.items()])+r'/'
        elif isinstance(content,list):
            return r'/'.join(([encode_content(data) for data in content]))+r'/'
        return ""
    data=['a',"b"]
    print(encode_content(data))
    

    实现反序列化

    def decode_to_str(content):
        """
        反序列化字符串
        :param content:字符串数据
        :return:
    
        """
        if isinstance(content,str):
            return content.replace(r'@S',r'/').replace('@A',r'@')
        return ""
    def decode_to_dict(content):
        """
        反序列化字典数据
        :param content:被序列化的字符串
        :return:
        """
        ret_dict=dict()
        if isinstance(content,str):
            item_strings=content.split(r'/')
            for item_string in item_strings:
                k_v_list=item_string.split(r'@=')
                if k_v_list is not None and len(k_v_list)>1:
                    k=k_v_list[0]
                    v=k_v_list[1]
                    ret_dict[decode_to_str(k)]=decode_to_str(v)
        return ret_dict
    
    def  decode_to_list(content):
        """
        反序列化列表数据
        :param content:被序列化的字符串
        :return:
        """
        ret_list=[]
        if isinstance(content,str):
            items =content.split(r'/')
            for idx,item in enumerate(items):
                if idx<len(items)-1:
                    ret_list.append(decode_to_str(item))
        return ret_list
    #测试
      # data={"a":"b",
        #       "c":"x@A"}
        data=["a","b/c","c"]
        encode_data=encode_content(data)
        # print(decode_to_dict(encode_data))
        print(encode_data)
        print(decode_to_list(encode_data))
    

    实现心跳机制
    在这里插入图片描述
    作用是让服务器解决假死连接问题,客户端必须每隔45秒发送一次请求,否则会被主动断开连接。
    实现发送心跳函数
    构建心跳数据包
    把数据包添加到发送数据包队列容器中

        def  send_heart_data_packet(self):
            """发送心跳机制"""
            send_data={
                'type':'mrkl'
            }
            content=encode_content(send_data)
            dp=DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)
            self.send_queue.put(dp)
    

    构建心跳线程
    构建心跳线程
    添加触发机制
    添加暂停机制

     def  send_heart_data_packet(self):
            """发送心跳机制"""
            send_data={
                'type':'mrkl'
            }
            content=encode_content(send_data)
            dp=DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)
            self.send_queue.put(dp)
        def start_ping(self):
            """开启心跳"""
            self.ping_running=True
        def stop_ping(self):
            """关闭心跳"""
            self.ping_running=False
        def  do_ping(self):
            """触发心跳"""
            while True:
                if self.ping_running:
                    self.send_heart_data_packet()
                    time.sleep(45)
    
    

    全部代码:

    import asyncore
    import sys
    from queue import Queue
    import threading
    import time
    DATA_PACKET_TYPE_SEND = 689
    DATA_PACKET_TYPE_RECV = 690
    
    def  encode_content(content):
        """
        序列化函数
        :param content:  需要序列化的内容
        :return:
        """
        if isinstance(content,str):
            return content.replace(r'@',r'@A').replace(r'/',r'@S')
        elif isinstance(content,dict):
            return r'/'.join(["{}@={}".format(encode_content(k),encode_content(v)) for k,v in content.items()])+r'/'
        elif isinstance(content,list):
            return r'/'.join(([encode_content(data) for data in content]))+r'/'
        return ""
    
    def decode_to_str(content):
        """
        反序列化字符串
        :param content:字符串数据
        :return:
    
        """
        if isinstance(content,str):
            return content.replace(r'@S',r'/').replace('@A',r'@')
        return ""
    def decode_to_dict(content):
        """
        反序列化字典数据
        :param content:被序列化的字符串
        :return:
        """
        ret_dict=dict()
        if isinstance(content,str):
            item_strings=content.split(r'/')
            for item_string in item_strings:
                k_v_list=item_string.split(r'@=')
                if k_v_list is not None and len(k_v_list)>1:
                    k=k_v_list[0]
                    v=k_v_list[1]
                    ret_dict[decode_to_str(k)]=decode_to_str(v)
        return ret_dict
    
    def  decode_to_list(content):
        """
        反序列化列表数据
        :param content:被序列化的字符串
        :return:
        """
        ret_list=[]
        if isinstance(content,str):
            items =content.split(r'/')
            for idx,item in enumerate(items):
                if idx<len(items)-1:
                    ret_list.append(decode_to_str(item))
        return ret_list
    class DataPacket():
        """封装数据包"""
    
        def __init__(self, type=DATA_PACKET_TYPE_SEND, content="", data_bytes=None):  # 默认是发送数据包,正文内容content
            if data_bytes is None:
    
                # 数据包的类型
                self.type = type
                # 数据部分内容
                self.content = content
                # 加密标记
                self.encrypt_flag = 0
                # 保留字段
                self.preserve_flag = 0
            else:
                self.type =int.from_bytes(data_bytes[4:6],byteorder="little",signed=False)
                self.encrypt_flag=int.from_bytes(data_bytes[6:7],byteorder="little",signed=False)
                self.preserve_flag=int.from_bytes(data_bytes[7:8],byteorder="little",signed=False)
                #构建数据部分
                self.content=str(data_bytes[8:-1],encoding="utf-8")
        def get_length(self):
            """获取数据包的长度,为以后发送数据包做准备"""
    
            return 4 + 2 + 1 + 1 + len(self.content.encode("utf-8")) + 1
    
        def get_bytes(self):
            """通过数据包转换成二进制数据"""
            data = bytes()
            # 构建四个字节的消息长度
            data_packet_length = self.get_length()
            # to_bytes 把一个整形数据转换成二进制数据
            # 第一个参数 表示需要转换的二进制数据占几个字节
            # byteorder 第二个参数描述字节序
            # signed 设置是否有符号
    
            # 处理消息长度
            data += data_packet_length.to_bytes(4, byteorder="little", signed=False)
            # 处理消息类型
            data += self.type.to_bytes(2, byteorder="little", signed=False)
            # 处理加密字段
            data += self.encrypt_flag.to_bytes(1, byteorder="little", signed=False)
            # 处理保留字段
            data += self.preserve_flag.to_bytes(1, byteorder="little", signed=False)
    
            # 处理数据内容
            data += self.content.encode("utf-8")
    
            # 添加\0数据
            data += b'\0'
            return data
    
    
    class DouyuClient(asyncore.dispatcher):
        def __init__(self, host, port,callback=None):
            # 构建发送数据包的队列容器
            # 存放了数据包对象
            self.send_queue = Queue()
    
            # 存放接收数据包对象
            self.recv_queue = Queue()
            #定义外部传入的自定义回调函数
            self.callback=callback
    
            asyncore.dispatcher.__init__(self)
            self.create_socket()
            address = (host, port)
            self.connect(address)
            #构建一个专门处理接收数据包容器中的数据包线程
            self.callback_thread=threading.Thread(target=self.do_callback)
            self.callback_thread.setDaemon(True)
            self.callback_thread.start()
            #构建心跳线程
            self.heart_thread=threading.Thread(target=self.do_ping)
            self.heart_thread.setDaemon(True)
            self.ping_running=False
    
            pass
    
        def handle_connect(self):
            print("连接成功")
            self.start_ping()
    
        def writable(self):
            return self.send_queue.qsize() > 0
    
        def handle_write(self):
            # 从发送数据包队列中获取数据包对象
            dp = self.send_queue.get()
            # 获取数据包长度,并且发送给服务器
            dp_length = dp.get_length()
    
            dp_length_data = dp_length.to_bytes(4, byteorder="little", signed=False)
    
            self.send(dp_length_data)
            # 发送数据包二进制数据
            self.send(dp.get_bytes())
            # 数据发送完成
            self.send_queue.task_done()
            pass
    
        def readable(self):
            return True
    
        def handle_read(self):
            # 读取长度,二进制数据
            data_length_data = self.recv(4)
            # 通过二进制获取 length具体数据
            data_length = int.from_bytes(data_length_data, byteorder='little', signed=False)
            # 通过数据包的长度获取数据
            data = self.recv(data_length)
            # 通过二进制数据构建数据包对象
            dp = DataPacket(data_bytes=data)
            # 把数据包放入接收数据包容器中
            self.recv_queue.put(dp)
    
        def handle_error(self):
            t, e, trace = sys.exc_info()
            print(e)
            self.close()
    
        def handle_close(self):
            self.stop_ping()
            print("连接关闭")
            self.close()
    
        def login_room_id(self, room_id):
            """登录服务器"""
            self.room_id=room_id
            send_data={
                'type':"loginreq",
                "roomid":str(room_id)
            }
            # 构建登录数据包
            content = encode_content(send_data)
            login_dp = DataPacket(DATA_PACKET_TYPE_SEND, content=content)
    
            # 把数据包添加到发送数据包容器中
            self.send_queue.put(login_dp)
        def join_room_group(self):
            """
            加入弹幕分组
            :return:
            """
            send_data={
                'type':'joingroup',
                'rid':str(self.room_id),
                'gid':'-9999'
            }
            content=encode_content(send_data)
            dp=DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)
            self.send_queue.put(dp)
            pass
        def  send_heart_data_packet(self):
            """发送心跳机制"""
            send_data={
                'type':'mrkl'
            }
            content=encode_content(send_data)
            dp=DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)
            self.send_queue.put(dp)
        def start_ping(self):
            """开启心跳"""
            self.ping_running=True
        def stop_ping(self):
            """关闭心跳"""
            self.ping_running=False
        def  do_ping(self):
            """触发心跳"""
            while True:
                if self.ping_running:
                    self.send_heart_data_packet()
                    time.sleep(45)
    
    
        def  do_callback(self):
            """专门负责处理接收数据包容器中的数据"""
            while True:
                #从接收数据包容器中获取数据包
                dp=self.recv_queue.get()
                #对数据进行处理
                if  self.callback is not None:
                    self.callback(self,dp)
                self.recv_queue.task_done()
            pass
    def data_callback(dp):
        """
        自定义回调函数
        :param dp: 数据包对象
        :return:
        """
        resp_data=decode_to_dict(dp.content)
        print(resp_data )
        if resp_data["type"]=="loginres":
            #调用加入分组请求
            print("登录成功",resp_data)
            client.join_room_group()
        elif resp_data['type'] =="chatmsg":
            print("{}:{}".format(resp_data["nn"],resp_data["txt"]))
        elif resp_data['type']=="onlinegift":
            print("暴击鱼丸")
        elif resp_data['type']=="uenter":
            print("{}进入房间".format(resp_data['nn']))
    
    
        print("data_callback:",dp.content)
        pass
    
    if __name__ == '__main__':
        client = DouyuClient('openbarrage.douyutv.com', 8601,callback=data_callback)
        client.login_room_id(4494106)
        asyncore.loop(timeout=20)
    
    

    这就是通过asyncore异步抓取弹幕,代码代码是几个月前写的,现在就是端口号变了,连接被拒绝没现在不能连接,但是可以了解asyncore模块的全部流程,在后面我们重新发表斗鱼弹幕抓取selenium抓取的相关代码

    展开全文
  • 前文链接:mawkish:(2020年最新)斗鱼弹幕抓取及实时弹幕数据可视化(一)​zhuanlan.zhihu.com上一篇中我们使用Python的websocket库成功连接上了斗鱼的弹幕服务器,最后讲到了我们连接成功后还需要给服务器发一条...

    6d0f920fc48a8cb492750cb9f072413f.png

    前文链接:

    mawkish:(2020年最新)斗鱼弹幕抓取及实时弹幕数据可视化(一)zhuanlan.zhihu.com
    1aad739e5971c4a64890919b95de3d71.png

    上一篇中我们使用Python的websocket库成功连接上了斗鱼的弹幕服务器,最后讲到了我们连接成功后还需要给服务器发一条“登录请求消息”,来告诉服务器我们不是“占着茅坑不拉屎”,以及必要的选项,比如目标房间号等等。

    阅读《协议》可知,客户端与弹幕服务器之间传输的数据并不是原文,而是在应用层经过了特殊的协议封装。

    0baa4403f9b3973865b0a6afd1dbf0f0.png

    如果学习过计算机网络的话这样的格式再熟悉不过,如果没学过也没事,听我慢慢道来。第一行“字节”只是为了表示在这个表中一行的宽度为4个字节,并不是实际的数据,真正的数据从第二行开始。

    根据《协议》可得,在原数据的基础上,我们需要加上消息长度、特定的头部与尾部。消息长度为4B,头部结构为(4B消息长度、2B消息、1B加密字段、1B保留字段),尾部为(1B的'0')。(注意消息长度重复了两次)

    这儿会涉及到一个字节流以及小端序的概念。在计算机网络中是以字节流进行数据传输的,因此传输字符数据之前需要将字符流转化为字节流,在对方接收到字节数据后再将其转化为字符数据,也就是人类可读的文字,大家常遇到的“乱码”问题其实就是字节流转化为字符流时出了问题。Python中提供了encode()方法来将字符串转化为字节流、decode()方法将字节流转化为字符串、以及bytearray([1, 2, 3])方法来将数组中的整数拼接为字节流。

    而小端序指的是数据的高字节保存在内存的高地址中,而数据的低字节保存在内存的低地址中。有点抽象,来看个例子:《协议》中规定协议中消息类型固定为2字节小端整数,而客户端发送的消息类型为689。那么689对应的2字节小端整数是什么呢?首先将689转化为十六进制0x02b1,按字节顺序拆分即[0x02, 0xb1],按这样的顺序放在内存中是大端序;如果倒着拆分即[0xb1, 0x02],这样放在内存中就是小端序。

    回到我们的需求:给服务器发一条“登录请求消息”,细化来说分为以下几个步骤:

    1. 构造登录请求消息字符串
    2. 将字符串转化为字节流数据,并且按照斗鱼协议格式进行封装(也就是加上头部和尾部)
    3. 调用websocket库的相关方法将数据发给服务器

    下面直接上代码:

    # 与前文(即系列第一篇文章)一样的代码省略
    
    # 将字符串数据按照斗鱼协议封装为字节流
    def dy_encode(msg):
        # 头部8字节,尾部1字节,与字符串长度相加即数据长度
        # 为什么不加最开头的那个消息长度所占4字节呢?这得问问斗鱼^^
        data_len = len(msg) + 9
        # 字符串转化为字节流
        msg_byte = msg.encode('utf-8')
        # 将数据长度转化为小端整数字节流
        len_byte = int.to_bytes(data_len, 4, 'little')
        # 前两个字节按照小端顺序拼接为0x02b1,转化为十进制即689(《协议》中规定的客户端发送消息类型)
        # 后两个字节即《协议》中规定的加密字段与保留字段,置0
        send_byte = bytearray([0xb1, 0x02, 0x00, 0x00])
        # 尾部以'0'结束
        end_byte = bytearray([0x00])
        # 按顺序拼接在一起
        data = len_byte + len_byte + send_byte + msg_byte + end_byte
    
        return data
    
    # 发送登录请求消息
    def login(ws):
        msg = 'type@=loginreq/roomid@=1126960/'
        msg_bytes = dy_encode(msg)
        ws.send(msg_byte)
    
    def on_open(ws):
        print('open')
        # 在连接上服务器后发送登录请求消息
        login(ws)
    
    def on_message(ws, message):
        # 将字节流转化为字符串,忽略无法解码的错误(即斗鱼协议中的头部尾部)
        print(message.decode(encoding='utf-8', errors='ignore'))

    运行之后首先会输出一个"open"代表连接上服务器;之后会接着输出一串含有乱码的字符串,其中有"type@=loginres"等字样,代表服务器成功接收到了我们发出的登录请求消息,并且给了我们回馈,登录成功!但是还是没有弹幕数据,过了一段时间后我们的连接还是被服务器给踢了。

    3f17a3155ff1b86191455ab4dc97bda4.png

    如果是直播老粉的话应该遇到过这种情况:打开一个直播间看了很久,期间你没有动过鼠标和键盘,就这样看直播看入迷了,突然斗鱼会提醒你:“你还在电脑前吗?”由此我们可以联想到,斗鱼服务器也会这样。如果我们的客户端就这样一直收着弹幕消息,但是一直不给服务器点反馈,时间久了服务器也有点疑惑,“这客户端是不是已经溜了啊?”,然后就把你客户端给踢了。这个我们后续会讲到,怎么给服务器反馈,让服务器知道,“其实我一直都在。”

    可能有人还会问,这服务器也没给我发弹幕消息啊,我这不啥都没收到么。让我们重回《协议》第25页,其中写到:

    b577f5e465529dbf6679647d03fd945d.png

    所以我们还差一条“入组消息”,有点类似TCP三次握手,通过三条消息来确保客户端与服务器之间的连接成功。根据协议要求以及前文经验,我们直接写出下列代码:

    # 同样已有代码省略,只写出有改动部分
    
    # 发送入组消息
    def join_group(ws):
        msg = 'type@=joingroup/rid@=1126960/gid@=-9999/'
        msg_bytes = dy_encode(msg)
        ws.send(msg_bytes)
    
    def on_open(ws):
        print('open')
        login(ws)
        # 登录后发送入组消息
        join_group(ws)

    再次运行,弹幕消息就源源不断的传输给你啦(虽然过段时间还是会被踢)。我们终于看到了熟悉的“弹幕文化”以及直播间的观众们与主播“友好的互动”。希望到这里大家会有一点成就感,然后趁热打铁,自己动脑筋把这些“乱糟糟”的数据解析出来,得到自己想要的干干净净的弹幕。解析用到的知识点并不多,大家利用好协议头部的“消息长度”,以及一些基本的字符串处理函数即可。

    ec238a3f0270429fe46682aada1a3425.png

    我们下篇再见!附上Github仓库地址:

    https://github.com/Crawler995/DouyuBarragegithub.com

    (2020/03/01更:上面这个项目(即第一版)不再维护,自己重构了个第二版DouyuBarrage-Pro,提供弹幕抓取、抓取记录查询、弹幕下载、自定义关键词统计、铁粉统计、高光时刻自动捕获、弹幕实时发送速度可视化、高频弹幕词云等功能。正在开发中,打造企业级应用的使用体验!)

    https://github.com/Crawler995/DouyuBarrage-Progithub.com
    展开全文
  • 背景因为想拿到一些知乎弹幕的数据 以及做一个直播播报机器人,所以最近在研究知乎直播的弹幕分析抓取比较简单,不多说了...都是正常的操作但是 拿到的数据却很奇怪为了演示方便,我们以 rest 接口示范,本质上和 ...

    背景

    因为想拿到一些知乎弹幕的数据 以及做一个直播播报机器人,所以最近在研究知乎直播的弹幕

    分析

    抓取比较简单,不多说了...都是正常的操作

    但是 拿到的数据却很奇怪

    为了演示方便,我们以 rest 接口示范,本质上和 websocket 接口是一样的。

    我们以直播间 11529 为例子

    拿取弹幕的接口是: https://www.zhihu.com/api/v4/drama/theaters/11529/recent-messages

    1680ddcde8ec5bab26293e6439eb1f51.png直播间弹幕返回数据截图

    可以看到弹幕数据应该在 messages 里面,但是数据好像经过了某种加密

    js 大搜查

    首先全局搜索 recent-messages,找到需要的 js 文件(这边也就是查找哪个 js 请求了拿弹幕的网址)

    把 js 文件下载到本地格式化后,搜索 recent-messages

    ddda7e7e7c9c9bdd3dc3e163ec952a83.png

    搜索LOAD_RECENT_MESSAGES

    找到了如何解析 message 的第一步 base64 解密

    js 中 atob 函数解释[1]

    a2d3cc33628936da0a005ded1bf95da5.png

    并且 转换后的结果传给了函数 p

    继续搜索p 往上搜索(记得搜索模式选择全词匹配与区分大小写) 要不然搜索结果太多了...

    运气好 上面第一个就是

    89e4a6c68a56504818de11f7c8a8f553.png

    为了验证可以替换知乎 js 到你本地的 js

    加两行console.log就行了

    代码如下...

    function p(e){

    console.log("before:", e);

    var t = d.EventMessage.decode(e),

    n = t.eventCode,

    r = t.event;

    console.log("after:", t);

    复制代码

    可以发现这就是我们想要的

    那么现在只要搞清楚 EventMessage.decode 这个方法干了啥就可以了...

    然后搜到了具体的代码

    0324d80294e130f5e55804f8dad14086.png

    就一步一步 debug,发现好像是某种编码规范?

    难道是知乎自己定义的吗...

    在这边搞了一周...还没有搞明白

    大概说下 我迷惑的点在哪

    aff52263182a873d9777ca1406003b2c.png

    如上这个 Uint8Array

    先是对第一位 >>> 操作,判断这个 字节表示的是 什么含义

    然后后面的 xxx 个字节表示具体的值,但是 xxx 个字节到底是多少个,是怎么区分的 我没有弄明白

    特别是如下 这三个明明都是 int64,他们的字节长度却不一样

    timestampMs 是 6 个字节

    theaterId 是 2 个字节

    dramaId 是 9 个字节

    我拿个小本本一边 debug 一边记...(本来字段少的话,看多了是可以直接找到规律 这样解决的,但是其中一个字段event是个字典,有 40 个 key...

    我看到代码的时候 就炸了...

    b6382e3cae28bfa0ba602d1cd96ea963.png

    所以我就想 算了 不了解它到底怎么实现的把,我直接吧这段 js 抠出来...然后搭个 nodejs 的服务得了

    扣 js

    扣的时候 还计较简单,除了这一句的s

    e instanceof s || (e = s.create(e));

    复制代码

    c95a88a9f5a2fd90e9ea7983c5885eca.png

    这个 s 到这我就找不到它到底从哪来的了

    所以我就只能 google.(搜了好多次)

    b5f79f88e8ee694568ee0d9e493d00b9.png

    真是惊喜!发现竟然是protobuf[2]

    所以这个所谓的加密 是一种通用的协议...

    至此,问题就简单了

    Protocol Buffers

    官方的定义如下:

    Protocol buffers 是一种与语言无关、与平台无关的可扩展机制,用于序列化结构化数据。

    更多介绍可以去看protocol-buffers 官网[3]

    下面的内容来自 Burpsuite 中 protobuf 数据流的解析[4]

    Varint 编码

    Protobuf 的二进制使用 Varint 编码。Varint 是一种紧凑的表示数字的方法。它用一个或多个字节来表示一个数字,值越小的数字使用越少的字节数。这能减少用来表示数字的字节数。

    Varint 中的每个 byte 的最高位 bit 有特殊的含义,如果该位为 1,表示后续的 byte 也是该数字的一部分,如果该位为 0,则结束。其他的 7 个 bit 都用来表示数字。因此小于 128 的数字都可以用一个 byte 表示。大于 128 的数字,比如 300,会用两个字节来表示:1010 1100 0000 0010。

    下图演示了 protobuf 如何解析两个 bytes。注意到最终计算前将两个 byte 的位置相互交换过一次,这是因为 protobuf 字节序采用 little-endian 的方式。

    d9642a36af89bfd3417a4b77d5c97496.png

    所以我们上面那个疑惑解决了...

    就是怎么确定某个字段到底应该几个字节(或者说现在能划分数据了)

    数值类型

    Protobuf 经序列化后以二进制数据流形式存储,这个数据流是一系列 key-Value 对。Key 用来标识具体的 Field,在解包的时候,Protobuf 根据 Key 就可以知道相应的 Value 应该对应于消息中的哪一个 Field。

    Key 的定义如下:

    (field_number << 3) | wire_type

    Key 由两部分组成。第一部分是 field_number,比如消息 tutorial .Person 中 field name 的 field_number 为 1。第二部分为 wire_type。表示 Value 的传输类型。Wire Type 可能的类型如下表所示:

    Type

    Meaning

    Used For

    Varint

    int32, int64, uint32, uint64, sint32, sint64, bool, enum

    1

    64-bit

    fixed64, sfixed64, double

    2

    Length-delimi

    string, bytes, embedded messages, packed repeated fields

    3

    Start group

    Groups (deprecated)

    4

    End group

    Groups (deprecated)

    5

    32-bit

    fixed32, sfixed32, float

    以数据流:08 96 01 为例分析计算 key-value 的值:

    #!bash

    08 = 0000 1000b

    => 000 1000b(去掉最高位)

    => field_num = 0001b(中间4位), type = 000(后3位)

    => field_num = 1, type = 0(即Varint)

    96 01 = 1001 0110 0000 0001b

    => 001 0110 0000 0001b(去掉最高位)

    => 1 001 0110b(因为是little-endian)

    => 128+16+4+2=150

    复制代码

    最后得到的结构化数据为:

    1:150

    其中 1 表示为 field_num,150 为 value。

    手动反序列化

    1f8b3103e5c6cae6c5849d7e98cf3745.png

    以上面例子中序列化后的二进制数据流进行反序列化分析:

    #!bash

    0A = 0000 1010b => field_num=1, type=2;

    2E = 0010 1110b => value=46;

    0A = 0000 1010b => field_num=1, type=2;

    07 = 0000 0111b => value=7;

    复制代码

    读取 7 个字符“Vincent”;

    #!bash

    10 = 0001 0000 => field_num=2, type=0;

    09 = 0000 1001 => value=9;

    1A = 0001 1010 => field_num=3, type=2;

    10 = 0001 0000 => value=16;

    复制代码

    读取 10 个字符“Vincent@test.com”;

    #!bash

    22 = 0010 0010 => field_num=4, type=2;

    0F = 0000 1111 => value=15;

    0A = 0000 1010 => field_num=1, type=2;

    0B = 0000 1011 => value=11;

    复制代码

    读取 11 个字符“15011111111”;

    #!bash

    10 = 0001 0000 => field_num=2, type=0;

    02 = 0000 0010 => value=2;

    复制代码

    最后得到的结构化数据为:

    #!bash

    1 {

    1: "Vincent"

    2: 9

    3: "Vincent@test.com"

    4 {

    1: "15011111111"

    2: 2

    }

    }

    复制代码

    使用 protoc 反序列化

    实现操作经常碰到较复杂、较长的流数据,手动分析确实麻烦,好在 protoc 加“decode_raw”参数可以解流数据,我实现了一个 python 脚本供使用:

    import subprocess, sys

    import json

    import base64

    def decode(data):

    process = subprocess.Popen(

    ["protoc", "--decode_raw"],

    stdin=subprocess.PIPE,

    stdout=subprocess.PIPE,

    stderr=subprocess.PIPE,

    )

    output = error = None

    try:

    output, error = process.communicate(data)

    except OSError:

    pass

    finally:

    if process.poll() != 0:

    process.wait()

    return output

    with open(sys.argv[1], "rb") as f:

    data = f.read()

    print('',decode(data))

    复制代码

    回到知乎直播

    那么就先测试解析一条吧

    import subprocess, sys

    import json

    import base64

    def decode(data):

    process = subprocess.Popen(

    ["protoc", "--decode_raw"],

    stdin=subprocess.PIPE,

    stdout=subprocess.PIPE,

    stderr=subprocess.PIPE,

    )

    output = error = None

    try:

    output, error = process.communicate(data)

    except OSError:

    pass

    finally:

    if process.poll() != 0:

    process.wait()

    return output

    a1 = "CAESpgMKowMKhgMKIDQwYjQ3Y2NiZmM0NDc1YjAxOGE1YTQxN2UxY2Y5ODk3EhLlsI/pgI/mmI7niLHlvrfljY4aDGR1LXlhby0xMy04NiJBaHR0cHM6Ly9waWM0LnpoaW1nLmNvbS92Mi1kMDYxNjFiMWQzOWNkNjRlYmRhNDBmOWMwNjVhNmNhNV94cy5qcGcqswEIiVoSCemFkueqneeqnRgBIAAoJTABOpoBCAEQABpAaHR0cHM6Ly9waWMxLnpoaW1nLmNvbS92Mi0xZDEyNTg1YzdhOTY2MTNkM2JlZjQxMTcyY2Q4ZWYxNV9yLnBuZyJAaHR0cHM6Ly9waWM0LnpoaW1nLmNvbS92Mi1iMDM1ZWRkNTA3NjgwNzU3MmJkNGU3YTg5MjRjZTEzYl9yLnBuZyoHIzcyQkJGRioHIzAwODRGRjJHCAkQjGAaQGh0dHBzOi8vcGljNC56aGltZy5jb20vdjItODI1NTRlYzgzYmViMzJlOWVjNDQxNGY0YzYyMmFjMmNfci5wbmcQARoM5oiR5Lmf6KeJ5b6XIICA8cTaz6SEERiplO2Pki4giVoogKDrtNOUlYQRMhUxLTEyMjczOTE5NjY4NTU4Mzk3NDQ4AQ=="

    message = base64.b64decode(a1)

    print(decode(message))

    复制代码

    结果如下

    Out[7]: b'1: 1\n2 {\n 1 {\n 1 {\n 1: "40b47ccbfc4475b018a5a417e1cf9897"\n 2: "\\345\\260\\217\\351\\200\\217\\346\\230\\216\\347\\210\\261\\345

    \\276\\267\\345\\215\\216"\n 3: "du-yao-13-86"\n 4: "https://pic4.zhimg.com/v2-d06161b1d39cd64ebda40f9c065a6ca5_xs.jpg"\n 5 {\n 1: 1152

    9\n 2: "\\351\\205\\222\\347\\252\\235\\347\\252\\235"\n 3: 1\n 4: 0\n 5: 37\n 6: 1\n 7 {\n 1: 1\n

    2: 0\n 3: "https://pic1.zhimg.com/v2-1d12585c7a96613d3bef41172cd8ef15_r.png"\n 4: "https://pic4.zhimg.com/v2-b035edd5076807572bd4e7a8924c

    e13b_r.png"\n 5: "#72BBFF"\n 5: "#0084FF"\n }\n }\n 6 {\n 1: 9\n 2: 12300\n 3: "https://pic4.zhimg.com/v2-82554ec83beb32e9ec4414f4c622ac2c_r.png"\n }\n }\n 2: 1\n 3: "\\346\\210\\221\\344\\271\\237\\350\\247\\211\\345\\276\\227"\n 4: 1227391966855839744\n }\n}\n3: 1585413048873\n4: 11529\n5: 1227323967020912640\n6: "1-1227391966855839744"\n7: 1\n'

    复制代码

    可以看到解析成功了,那么后面的工作就比较简单了...

    只要按照知乎的 js 对应出某个位置的具体字段名字就好了...

    最后看一个成功的截图

    92263f312215494c44204a3553fc0065.png

    参考资料

    [1]

    js中atob函数解释: https://developer.mozilla.org/zh-CN/docs/Web/API/WindowBase64/atob

    [2]

    protobuf: https://github.com/protobufjs/protobuf.js

    [3]

    protocol-buffers官网: https://developers.google.com/protocol-buffers

    [4]

    Burpsuite中protobuf数据流的解析: https://wooyun.js.org/drops/Burpsuite%E4%B8%ADprotobuf%E6%95%B0%E6%8D%AE%E6%B5%81%E7%9A%84%E8%A7%A3%E6%9E%90.html

    本文使用 mdnice 排版

    展开全文
  • (下面是此项目的Github链接,大家时间宝贵,如果觉得可以的话再看文章也不迟^^)...提供弹幕抓取、抓取记录查询、弹幕下载、自定义关键词统计、铁粉统计、高光时刻自动捕获、弹幕实时发送速度可视化、高...
  • 前言最近同学要做东西,需要用 B 站的视频对应的弹幕数据做分析,于是请我帮忙爬取 B 站视频的弹幕数据。对于爬虫而言,我们需要找到对应数据所在的接口,找到接口,就可以找到对应的数据。这个时候我们只需要简单的...
  • Bilibili直播弹幕抓取(1):WebSocket

    千次阅读 2018-07-16 14:09:51
    Bilibili直播弹幕抓取(1):WebSocket转载自https://ihomura.cn/2018/05/14/Bilibili%E7%9B%B4%E6%92%AD%E5%BC%B9%E5%B9%95%E6%8A%93%E5%8F%96-1-WebSocket/前言最近有一个学长去分析了B站直播弹幕WebSocket协议,我...
  • Bilibili直播弹幕抓取(2):Fiddler与WireShark转载自:https://ihomura.cn/2018/05/15/Bilibili%E7%9B%B4%E6%92%AD%E5%BC%B9%E5%B9%95%E6%8A%93%E5%8F%96-2-Fiddler%E4%B8%8EWireShark/前言上次提到了 Bilibili 的弹...
  • 原标题:零基础学爬虫(一):不用编程抓取B站弹幕信息 网络爬虫是一种自动获取网页内容的程序,是搜索引擎的重要组成部分。利用网络爬虫可以做到很多很有趣、有用的事情,比如收集某网站的用户信息、抓取论坛发言、各...
  • 某东实时弹幕抓取

    2020-12-27 23:05:34
    对于已经拿到sign只想看弹幕的部分直接跳到4。 用的抓包工具是HttpCanary,随便进入一间直播间,可以看到弹幕走的是websocket,携带的url参数token来自于下面post返回的结果,经过分析只有st,sign,sv三个参数会变,...
  • Bilibili 弹幕抓取系列到这里就结束了,这个过程中虽然绕了很多弯路浪费了大把时间,不过我还是学到了不少知识: WebSocket FiddlerScript 编写 WireShark 基本使用 抓包分析能力(二进制敏感度?) 另外感觉计网光...
  • <div><p>无法连接openbarrage.douyutv.com</p><p>该提问来源于开源项目:davidkingzyb/pccold</p></div>
  • 目前互联网直播弹幕主要是两种技术实现。 1websocket消息通信,js拿到消息再处理到dom中,逆向验证流程,和服务端建立连接后即可,逆向难度较低,消息分明文和加密两种情况,前端无秘密,加密也能找到解密的js...
  • 因此,本场 Chat,以 B 站为抓取对象,主要介绍如何抓取 B 站弹幕,并对此进行简单的分析。 本场 Chat 主要内容: B 站弹幕的原理; Beautiful Soup 使用介绍; 爬虫抓取 B 站弹幕; 对弹幕进行简单分析。 阅读全文:...
  • 可以实现nc窗口与telnet窗口互相通信即可) 三、斗鱼弹幕抓取文档及源码 3.1 Socket编程-asyncore模块基础模板的使用 3.2 斗鱼弹幕抓取实战 一、原理概述及结果展示 1.1 计算机网络基础知识 Socket:在计算机通信...
  • 快乐的抓取弹幕

    2019-05-22 16:04:13
    发现了一个特别快乐的包, 据说支持各大直播网站的弹幕抓取(亲测斗鱼有效) """ @author xiaofei @date 2019-05-22 @desc 抓取弹幕 """ from danmu import DanMuClient dmc = DanMuClient("直播间url") if not dmc....
  • nodejs 抓取斗鱼实时弹幕+离线弹幕 斗鱼直播有两种弹幕,实时弹幕和离线弹幕 实时弹幕:通过webscoket获取 离线弹幕:通过api接口获取 本chat主要讲解如何获取这两种弹幕数据...
  • B站弹幕评论抓取

    2021-04-28 21:18:16
    B站弹幕评论抓取 #!/usr/bin/env python # -*- coding:utf-8 -*- # @Author: Minions # @Date: 2021-04-28 10:59:25 # @Last Modified by: Minions # @Last Modified time: 2021-04-28 21:15:36 import requests ...
  • 推荐:《PHP视频教程》斗鱼弹幕 PHP版本github地址:https://github.com/wjhtime之前写过python获取斗鱼的弹幕 传送门,突发奇想,想要用php来实现。弹幕获取实现起来很简单,用swoole很容易做到了,后期也做了一些...
  • BiliBili直播 弹幕信息抓取

    万次阅读 2016-11-16 21:58:56
    最近由于自己在B站直播的原因,对B站本身提供的实时弹幕不太满意,于是自己抓包写了一个bilibili的弹幕协议,但还有一部分没有完成。留下的坑以后再慢慢填吧。 socket包获拦截工具:Wireshark demo实现语言环境...
  • 原标题:Python实战 | 如何抓取腾讯视频弹幕当代年轻人的快乐是网络给的。如果有人吐槽周末太无聊,他们一定会反驳:是追剧不香吗?是吃鸡不好玩吗?周末辣么短,怎么会无聊呢? 诚然,追剧和游戏已然成了这届年轻人...
  • 作者|GitPython时隔一年,嵩哥带来他的新作《雨幕》。他依旧认真创作,追求高品质,作品在发表之前已听了...02弹幕数据平常我们在看视频时,弹幕是出现在视频上的。实际上在网页中,弹幕是被隐藏在源代码中,以XML的...
  • [Java记录]实时抓取斗鱼弹幕[Java记录]实时抓取斗鱼弹幕// 关键依赖 WebSocket客户端org.java-websocketJava-WebSocket1.5.1import org.java_websocket.client.WebSocketClient;import org.java_websocket.handshake...
  • python爬虫----简单的抓取斗鱼弹幕

    千次阅读 2017-08-08 16:10:03
    于是就想能不能把弹幕抓取下来,带着这个问题我就点开了一个直播间。按照以前学过的方法好像根本没有办法弄到弹幕那一块, 于是赶紧去网上查,发现网上有不少人已经做了。呃......怎么说,大概了解了获取弹幕的原理...
  • 抓取熊猫TV弹幕

    2018-03-28 14:01:18
    标签:弹幕 /熊猫tv /pandatv 前天看到了别人写的抓取斗鱼弹幕的程序,抓取斗鱼弹幕是我很早以前的一个想法,但是无奈不会写,不懂得tcp传过来的那些字节的含义,所以没写出来,当我看到别人写好的代码,我就参照...
  • -无业游民找工作莫得结果的第七天,继续人间小苦瓜和关键词云的爱恨情仇快速指路:1.1腾讯视频弹幕提取并制作关键词云1:弹幕在哪里1.2腾讯视频弹幕提取并制作关键词云2:scrapy弹幕的获取1.3企鹅弹幕提取并制作...

空空如也

空空如也

1 2 3 4 5 ... 9
收藏数 175
精华内容 70
关键字:

弹幕抓取