发布于 

python实现B站直播自动弹幕发送

背景

几个舍友突发奇想在B站尝试直播,但是却没什么人看,没有弹幕烘托气氛,只能自己一条一条发,感觉很麻烦,所以想到怎么使用Python解决。

要求

  1. 有一个B站账号

  2. 稍微有python基础

  3. 懂一点网络抓包

  4. python3.7

获取B站直播弹幕发送的API

  1. 登录B站,随便打开一个直播间,按下 F12,选择 网络,选择 Fetch/XHR
loading-ag-1282
loading-ag-1282
  1. 随便发送一条弹幕,在下面的列表中找到 send 这一个包,选择 标头 ,这就是我们需要的 api,请求方式为:POST
loading-ag-176
loading-ag-176
  1. 往下面查找,找到 Cookie,Origin,Referer和User-Agent。
loading-ag-178
loading-ag-178
  1. 选择 负载,即 POST请求 需要携带的数据
loading-ag-186
loading-ag-186
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 简单的代码
import requests

url = 'https://api.live.bilibili.com/msg/send'
data = {
'bubble': '0',
'msg': 'test',# 发送的信息
'color': '16777215',
'mode': '1',
'fontsize': '25',
'rnd': '1646460756',# 时间戳
'room_type': '0',
'roomid': '', # 直播间房间号
'csrf': '', # 自己的csrf
'csrf_token': '', 自己的csrf_token
}

headers = {
'cookie': '', # 自己的cookie
'origin': 'https://live.bilibili.com',
'referer': '', # 自己的referer
'user-agent': '' # 自己的user—agent
}
response = requests.post(url=url, data=data, headers=headers)

if response.status_code == 200:
print("消息发送成功!")
# print(response.json())
else:
print(f"消息发送失败,状态码:{response.status_code}")
print(response.text)

# 如果显示:消息发送成功,,但是却没有弹幕发送,可以刷新直播间,
# 如果还是没有,就可以取消` print(response.json())` 的注释,查看一下返回的响应,找出错的地方

进阶

  1. 多准备几个账号,创建一个账号池,每次随机选取一个账号进行发信息。

  2. 连接数据库,创建一个 弹幕文本库,每次随机选择一条进行发送。

  3. 创建一个GUI(但是自己用,好像没必要)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import requests
import time
import random
import pymysql



connection = pymysql.connect(
host="localhost",
user="root",
passwd="",
# auth_plugin='mysql_native_password', # 加密认证方式
database="danmaku", # 连接的数据库
# autocommit=True # 每次对数据库操作完自动提交
)

cursor = connection.cursor()
sql = "SELECT COUNT(*) FROM test"

cursor.execute(sql)
table_length = cursor.fetchone()[0]

sql1 = "select comment from test where id= %s"


url = "https://api.live.bilibili.com/msg/send"



# 加密数据
salt_data1 = {
'jumpfrom': '',
'csrf': '',
'csrf_token': '',
}

salt_data2 = {
'jumpfrom': '',
'csrf': '',
'csrf_token': '',
}



headers1 = {
'cookie':"",
'Origin': 'https://live.bilibili.com',
'Referer': "",
"user-agent":""
}

headers2 = {
'cookie':"",
'Origin': 'https://live.bilibili.com',
'Referer': "",
"user-agent":""

salt_list = [salt_data1, salt_data2]
header_list = [headers1, headers2]


# print(data)
# print(select_header)


while True:
time.sleep(2)
# 获取当前时间戳(以秒为单位)
timestamp = int(time.time())

cursor.execute(sql1, random.randint(1, table_length))
random_comment = cursor.fetchone()[0]

# 通用数据
com_data = {
'bubble': '0',
'msg': '{}'.format(random_comment),
'color': '16777215',
'mode': '1',
'room_type': '0',
'fontsize': '25',
'rnd': '{}'.format(timestamp), # 时间戳
# 30899092
'roomid': '13845655',
}

randint = random.randint(0, 1)

select_salt = salt_list[randint]
select_header = header_list[randint]

data = {}

for data_dict in [com_data, select_salt]:
data.update(data_dict)

response = requests.post(url=url, data=data, headers=select_header)

if response.status_code == 200:
print("消息发送成功!")
# print(response.json())
else:
print(f"消息发送失败,状态码:{response.status_code}")
print(response.text)

# 当然代码还能继续修改,但是作为一个临时使用已经完全足够了。

参考

https://blog.csdn.net/m0_48405781/article/details/123295681


本站由 @Mogra 使用 Stellar 主题创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。