biliob/src/main/java/com/yutou/biliapi/net/WebSocketServer.java
zlzw e4e5696b70 新增了关键词检测
新增了手动暂停的恢复
2024-11-29 15:45:29 +08:00

321 lines
12 KiB
Java

package com.yutou.biliapi.net;
import com.aayushatharva.brotli4j.Brotli4jLoader;
import com.aayushatharva.brotli4j.decoder.Decoder;
import com.aayushatharva.brotli4j.decoder.DecoderJNI;
import com.aayushatharva.brotli4j.decoder.DirectDecompress;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.yutou.biliapi.api.LiveApi;
import com.yutou.biliapi.bean.live.LiveDanmuInfo;
import com.yutou.biliapi.bean.live.LiveRoomConfig;
import com.yutou.biliapi.bean.live.LiveRoomInfo;
import com.yutou.biliapi.bean.login.LoginCookieDatabaseBean;
import com.yutou.biliapi.bean.websocket.WebSocketBody;
import com.yutou.biliapi.bean.websocket.WebSocketHeader;
import com.yutou.biliapi.bean.websocket.live.WSData;
import com.yutou.biliapi.databases.BiliBiliLoginDatabase;
import com.yutou.biliapi.databases.BiliLiveDatabase;
import com.yutou.biliapi.utils.BytesUtils;
import com.yutou.bilibili.services.LiveDatabasesService;
import com.yutou.common.okhttp.HttpBody;
import com.yutou.common.okhttp.HttpCallback;
import com.yutou.common.utils.ConfigTools;
import com.yutou.common.utils.Log;
import jakarta.annotation.Resource;
import okhttp3.Headers;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.stereotype.Service;
import retrofit2.Response;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Service
public class WebSocketServer {
ThreadPoolExecutor executor;
private static WebSocketServer instance;
Map<LiveRoomConfig, DanmuTask> roomMap;
private final List<String> userStopList = new ArrayList<>();//手动停止列表
@Resource
LiveDatabasesService liveDatabasesService;
private WebSocketServer() {
roomMap = new HashMap<>();
executor = new ThreadPoolExecutor(2, 4, Long.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
}
public boolean checkRoom(LiveRoomConfig roomConfig) {
return roomMap.containsKey(roomConfig);
}
public JSONArray getLiveRoomList() {
JSONArray array = new JSONArray();
array.addAll(roomMap.keySet());
return array;
}
public void clearUserStopList() {
userStopList.clear();
}
public void addRoom(LiveRoomConfig roomConfig, boolean isUser) {
if (!isUser && userStopList.contains(roomConfig.getRoomId().toString())) {
return;
}
if (checkRoom(roomConfig)) {
return;
}
if (isUser) {
userStopList.remove(roomConfig.getRoomId().toString());
}
DanmuTask task = new DanmuTask(roomConfig);
roomMap.put(roomConfig, task);
Log.i("添加websocket任务");
executor.execute(task);
}
public void stopRoom(String roomId, boolean isUser) {
LiveRoomConfig roomConfig = new LiveRoomConfig();
roomConfig.setRoomId(new String(roomId));
if (checkRoom(roomConfig)) {
roomMap.get(roomConfig).close();
roomMap.remove(roomConfig);
}
if (isUser) {
userStopList.add(roomConfig.getRoomId().toString());
}
}
public void removeUserStopList(String roomId) {
userStopList.remove(roomId);
}
public boolean isUserStopList(String roomId) {
return userStopList.contains(roomId);
}
private class DanmuTask implements Runnable {
LiveRoomConfig roomConfig;
WebSocketClientTh client;
public DanmuTask(LiveRoomConfig config) {
this.roomConfig = config;
roomMap.put(roomConfig, this);
}
@Override
public void run() {
LiveApi api = BiliLiveNetApiManager.getInstance().getApi(roomConfig.getLoginUid());
Response<HttpBody<LiveRoomInfo>> execute = null;
try {
execute = api.getRoomInfo(roomConfig.getRoomId().toString()).execute();
if (execute.isSuccessful()) {
roomConfig.setRoomInfo(execute.body() != null ? execute.body().getData() : null);
}
} catch (IOException e) {
roomMap.remove(roomConfig);
throw new RuntimeException(e);
}
api.getLiveRoomDanmuInfo(String.valueOf(roomConfig.getRoomId())).enqueue(new HttpCallback<LiveDanmuInfo>() {
@Override
public void onResponse(Headers headers, int code, String status, LiveDanmuInfo response, String rawResponse) {
if (!response.getHostList().isEmpty()) {
LiveDanmuInfo.Host host = response.getHostList().get(new Random().nextInt(response.getHostList().size()));
String url = "wss://" + host.getHost() + ":" + host.getWssPort() + "/sub";
// url="ws://127.0.0.1:8765";
try {
roomConfig.setLiveInfo(response);
client = new WebSocketClientTh(new URI(url), roomConfig);
} catch (URISyntaxException e) {
roomMap.remove(roomConfig);
throw new RuntimeException(e);
}
}
}
@Override
public void onFailure(Throwable throwable) {
roomMap.remove(roomConfig);
Log.e(throwable);
}
});
}
public void close() {
client.close();
}
}
private class WebSocketClientTh extends WebSocketClient {
private final LiveRoomConfig roomConfig;
private final HeartbeatTask heartbeatTask;
BiliLiveDatabase liveDatabase;
public WebSocketClientTh(URI serverUri, LiveRoomConfig roomId) {
super(serverUri);
Log.i("WebSocketClientTh.WebSocketClientTh : " + serverUri);
this.roomConfig = roomId;
liveDatabase = liveDatabasesService.getLiveDatabase(roomConfig.getRoomId());
Brotli4jLoader.ensureAvailability();
heartbeatTask = new HeartbeatTask();
addHeader("User-Agent", ConfigTools.getUserAgent());
connect();
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
heartbeatTask.setSocket(this);
heartbeatTask.sendInitAuthData();
new Timer().schedule(heartbeatTask, 1000, 30000);
Log.i("WebSocketClientTh.onOpen", roomConfig.getRoomId());
}
@Override
public void onMessage(String s) {
Log.i("s = " + s);
}
@Override
public void onMessage(ByteBuffer bytes) {
// Log.i("WebSocketClientTh.onMessage: " + roomConfig.getAnchorName());
super.onMessage(bytes);
decompress(bytes.array());
}
@Override
public void onClose(int i, String s, boolean b) {
Log.e("WebSocketClientTh.onClose", "i = " + i + ", s = " + s + ", b = " + b, roomConfig.getRoomId(), heartbeatTask.socket.isOpen());
roomMap.remove(roomConfig);
heartbeatTask.cancel();
}
@Override
public void onError(Exception e) {
Log.i("WebSocketClientTh.onError", roomConfig.getRoomId());
Log.e(e);
roomMap.remove(roomConfig);
heartbeatTask.cancel();
}
/**
* 解压缩
*
* @param data 待压缩的数据
*/
public void decompress(byte[] data) {
byte[] bytes = new byte[data.length - 16];
WebSocketHeader header = new WebSocketHeader(data);
System.arraycopy(data, header.getHeaderSize(), bytes, 0, data.length - header.getHeaderSize());
// Log.i("数据大小:" + header.getDataSize() + " 协议:" + header.getAgree() + " 头部大小:" + header.getHeaderSize() + " 命令:" + header.getCmdData());
switch (header.getAgree()) {
case 0:
case 1:
danmu(bytes);
break;
default:
unzipDanmu(bytes, header.getAgree() == 3);
}
}
private void danmu(byte[] bytes) {
//Log.i("未压缩:" + new String(bytes));
}
private void unzipDanmu(byte[] bytes, boolean useHeader) {
try {
DirectDecompress directDecompress = Decoder.decompress(bytes);
if (directDecompress.getResultStatus() == DecoderJNI.Status.DONE) {
WebSocketBody body = new WebSocketBody(directDecompress.getDecompressedData());
// Log.i("协议:" + useHeader + " 命令数:" + body.getBodyList().size());
for (JSONObject json : body.getBodyList()) {
WSData parse = WSData.parse(json);
liveDatabase.addSource(parse);
// Log.i("解压:" + parse);
}
// Log.i();
// Log.i();
} else {
Log.e(new RuntimeException("解压失败"));
}
} catch (Exception e) {
Log.e(e);
}
}
private class HeartbeatTask extends TimerTask {
WebSocketClientTh socket;
public void setSocket(WebSocketClientTh socket) {
this.socket = socket;
}
@Override
public void run() {
try {
// com.yutou.bilibili.Tools.Log.i("-------发送心跳--------");
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
outputStream.write(BytesUtils.toLH("[object Object]".length() + 16));
outputStream.write(new byte[]{0, 16, 0, 1, 0, 0, 0, 2, 0, 0, 0, 1});
outputStream.write("[object Object]".getBytes(StandardCharsets.UTF_8));
outputStream.flush();
socket.send(outputStream.toByteArray());
} catch (Exception e) {
Log.e(e);
}
}
public void sendInitAuthData() {
JSONObject json = new JSONObject();
if (roomConfig.isLogin()) {
json.put("uid", new BigInteger(roomConfig.getLoginUid()));
} else {
json.put("uid", 0);
}
LoginCookieDatabaseBean cookie = BiliBiliLoginDatabase.getInstance().getCookie(roomConfig.getLoginUid());
Log.d("cookie:", cookie, "RoomId:" + roomConfig);
try {
json.put("roomid", new BigInteger(roomConfig.getRoomId()));
json.put("protover", 3);
json.put("platform", "web");
json.put("type", 2);
json.put("key", roomConfig.getLiveInfo().getToken());
byte[] bytes = {0, 16, 0, 1, 0, 0, 0, 7, 0, 0, 0, 1};
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
// Log.i("bytes.length = " + bytes.length);
Log.i(json);
outputStream.write(BytesUtils.toLH(json.toString().length() + 16));
outputStream.write(bytes);
outputStream.write(json.toJSONString().getBytes(StandardCharsets.UTF_8));
outputStream.flush();
// BytesUtils.printHex(outputStream.toByteArray());
Log.i(socket.isOpen(), json.toString());
socket.send(outputStream.toByteArray());
} catch (Exception e) {
Log.e(e);
}
}
}
}
}