调整websocket,加线程处理数据,防止阻塞(可能?)

This commit is contained in:
zlzw 2024-12-10 14:58:12 +08:00
parent d2bcd5c64a
commit 516e030266
4 changed files with 22 additions and 17 deletions

View File

@ -9,7 +9,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication()
public class BilibiliApplication {
public static String version = "0.9";
public static String version = "0.10";
public static void main(String[] args) {
Log.i("启动版本", version);

View File

@ -110,7 +110,7 @@ public class BiliLiveDatabase extends SQLiteManager {
}
}
public void addSource(WSData bean) {
public synchronized void addSource(WSData bean) {
// Log.i("BiliLiveDatabase.addSource", config.getRoomId());
add(new LiveSourceDatabaseBean(bean));
addData(bean);

View File

@ -224,18 +224,23 @@ public class WebSocketServer {
* @param data 待压缩的数据
*/
public void decompress(byte[] data) {
byte[] bytes = new byte[data.length - 16];
WebSocketHeader header = new WebSocketHeader(data);
System.arraycopy(data, header.getHeaderSize(), bytes, 0, data.length - header.getHeaderSize());
// Log.i("数据大小:" + header.getDataSize() + " 协议:" + header.getAgree() + " 头部大小:" + header.getHeaderSize() + " 命令:" + header.getCmdData());
switch (header.getAgree()) {
case 0:
case 1:
danmu(bytes);
break;
default:
unzipDanmu(bytes, header.getAgree() == 3);
}
Thread.ofVirtual()
.name("TaskUnDanmu-" + Thread.currentThread().getName())
.start(() -> {
byte[] bytes = new byte[data.length - 16];
WebSocketHeader header = new WebSocketHeader(data);
System.arraycopy(data, header.getHeaderSize(), bytes, 0, data.length - header.getHeaderSize());
switch (header.getAgree()) {
case 0:
case 1:
danmu(bytes);
break;
default:
unzipDanmu(bytes, header.getAgree() == 3);
}
});
}
@ -249,7 +254,7 @@ public class WebSocketServer {
DirectDecompress directDecompress = Decoder.decompress(bytes);
if (directDecompress.getResultStatus() == DecoderJNI.Status.DONE) {
WebSocketBody body = new WebSocketBody(directDecompress.getDecompressedData());
Log.getDynamicLogger(logTag).info("协议:{},命令数:{}", useHeader, +body.getBodyList().size());
Log.getDynamicLogger(logTag).info("协议:{},命令数:{}", useHeader, body.getBodyList().size());
for (JSONObject json : body.getBodyList()) {
WSData parse = WSData.parse(json);
liveDatabasesService.getLiveDatabase(roomConfig.getRoomId()).addSource(parse);

View File

@ -104,7 +104,7 @@ public abstract class SQLiteManager {
}
}
protected <T extends AbsDatabasesBean> void add(T t) {
protected synchronized <T extends AbsDatabasesBean> void add(T t) {
StringBuilder sb = new StringBuilder();
try (PreparedStatement statement = getConnection().prepareStatement(buildInsertSql(t))) {
JSONObject json = t.toJson();
@ -162,7 +162,7 @@ public abstract class SQLiteManager {
return sb.toString();
}
protected <T extends AbsDatabasesBean> void update(T t) {
protected synchronized <T extends AbsDatabasesBean> void update(T t) {
try (PreparedStatement statement = getConnection().prepareStatement(buildUpdateSql(t))) {
JSONObject json = t.toJson();
Set<String> keySet = json.keySet();
@ -367,7 +367,7 @@ public abstract class SQLiteManager {
return json;
}
protected <T extends AbsDatabasesBean> boolean delete(T t) {
protected synchronized <T extends AbsDatabasesBean> boolean delete(T t) {
Statement statement = null;
try {
statement = getConnection().createStatement();