update 统一管理BiliLiveDatabase

This commit is contained in:
2024-11-28 23:18:24 +08:00
parent 3f10e19e50
commit 8afb8f8f0b
8 changed files with 79 additions and 48 deletions

View File

@@ -17,13 +17,16 @@ import com.yutou.biliapi.bean.websocket.live.WSData;
import com.yutou.biliapi.databases.BiliBiliLoginDatabase;
import com.yutou.biliapi.databases.BiliLiveDatabase;
import com.yutou.biliapi.utils.BytesUtils;
import com.yutou.bilibili.services.LiveDatabasesService;
import com.yutou.common.okhttp.HttpBody;
import com.yutou.common.okhttp.HttpCallback;
import com.yutou.common.utils.ConfigTools;
import com.yutou.common.utils.Log;
import jakarta.annotation.Resource;
import okhttp3.Headers;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.stereotype.Service;
import retrofit2.Response;
import java.io.ByteArrayOutputStream;
@@ -38,24 +41,20 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class WebSocketClientManager {
@Service
public class WebSocketServer {
ThreadPoolExecutor executor;
private static WebSocketClientManager instance;
private static WebSocketServer instance;
Map<LiveRoomConfig, DanmuTask> roomMap;
private final List<String> userStopList = new ArrayList<>();//手动停止列表
@Resource
LiveDatabasesService liveDatabasesService;
private WebSocketClientManager() {
private WebSocketServer() {
roomMap = new HashMap<>();
executor = new ThreadPoolExecutor(2, 4, Long.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
}
public static WebSocketClientManager getInstance() {
if (instance == null) {
instance = new WebSocketClientManager();
}
return instance;
}
public boolean checkRoom(LiveRoomConfig roomConfig) {
return roomMap.containsKey(roomConfig);
}
@@ -98,13 +97,13 @@ public class WebSocketClientManager {
}
}
private static class DanmuTask implements Runnable {
private class DanmuTask implements Runnable {
LiveRoomConfig roomConfig;
WebSocketClientTh client;
public DanmuTask(LiveRoomConfig config) {
this.roomConfig = config;
WebSocketClientManager.getInstance().roomMap.put(roomConfig, this);
roomMap.put(roomConfig, this);
}
@Override
@@ -117,7 +116,7 @@ public class WebSocketClientManager {
roomConfig.setRoomInfo(execute.body() != null ? execute.body().getData() : null);
}
} catch (IOException e) {
WebSocketClientManager.getInstance().roomMap.remove(roomConfig);
roomMap.remove(roomConfig);
throw new RuntimeException(e);
}
api.getLiveRoomDanmuInfo(String.valueOf(roomConfig.getRoomId())).enqueue(new HttpCallback<LiveDanmuInfo>() {
@@ -131,7 +130,7 @@ public class WebSocketClientManager {
roomConfig.setLiveInfo(response);
client = new WebSocketClientTh(new URI(url), roomConfig);
} catch (URISyntaxException e) {
WebSocketClientManager.getInstance().roomMap.remove(roomConfig);
roomMap.remove(roomConfig);
throw new RuntimeException(e);
}
}
@@ -139,7 +138,7 @@ public class WebSocketClientManager {
@Override
public void onFailure(Throwable throwable) {
WebSocketClientManager.getInstance().roomMap.remove(roomConfig);
roomMap.remove(roomConfig);
Log.e(throwable);
}
});
@@ -150,7 +149,7 @@ public class WebSocketClientManager {
}
}
private static class WebSocketClientTh extends WebSocketClient {
private class WebSocketClientTh extends WebSocketClient {
private final LiveRoomConfig roomConfig;
private final HeartbeatTask heartbeatTask;
BiliLiveDatabase liveDatabase;
@@ -159,7 +158,7 @@ public class WebSocketClientManager {
super(serverUri);
Log.i("WebSocketClientTh.WebSocketClientTh : " + serverUri);
this.roomConfig = roomId;
liveDatabase = new BiliLiveDatabase(roomConfig);
liveDatabase = liveDatabasesService.getLiveDatabase(roomConfig.getRoomId());
Brotli4jLoader.ensureAvailability();
heartbeatTask = new HeartbeatTask();
addHeader("User-Agent", ConfigTools.getUserAgent());
@@ -190,8 +189,7 @@ public class WebSocketClientManager {
@Override
public void onClose(int i, String s, boolean b) {
Log.e("WebSocketClientTh.onClose", "i = " + i + ", s = " + s + ", b = " + b, roomConfig.getRoomId(), heartbeatTask.socket.isOpen());
WebSocketClientManager.getInstance().roomMap.remove(roomConfig);
liveDatabase.close();
roomMap.remove(roomConfig);
heartbeatTask.cancel();
}
@@ -199,8 +197,7 @@ public class WebSocketClientManager {
public void onError(Exception e) {
Log.i("WebSocketClientTh.onError", roomConfig.getRoomId());
Log.e(e);
WebSocketClientManager.getInstance().roomMap.remove(roomConfig);
liveDatabase.close();
roomMap.remove(roomConfig);
heartbeatTask.cancel();
}