Commit b49ca480 by xmh

修改

1 parent 27b79990
package com.viontech;
import com.viontech.model.KeepAlive;
import com.viontech.netty.ChannelGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@SpringBootApplication(exclude = org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration.class)
@Slf4j
@EnableScheduling
public class Application {
public final static long REAL_TIME = 0x00000001L;
public final static long NOT_REAL_TIME = 0x00000002L;
......@@ -18,4 +23,8 @@ public class Application {
}
}
@Scheduled(fixedRate = 5000L, initialDelay = 5000L)
public void keepAlive() {
ChannelGroup.broadcast(new KeepAlive());
}
}
......@@ -218,7 +218,7 @@ public class FTPClientHelper {
private boolean makeDirectorys(FTPClient ftpclient, String remote) throws Exception {
File file = new File(remote);
if (file.getPath().indexOf(File.separator) == -1) { //是根路径
if (!file.getPath().contains(File.separator)) {
ftpclient.makeDirectory(file.getPath());
} else {
makeDirectorys(ftpclient, file.getParent());
......
......@@ -15,7 +15,7 @@ import lombok.experimental.Accessors;
@Setter
@Accessors(chain = true)
public class BaseModel {
protected long deviceId;
protected long deviceId = 1;
protected long length;
protected long protocol;
protected long flag;
......@@ -34,11 +34,11 @@ public class BaseModel {
long deviceId = byteBuf.readUnsignedIntLE();
long length = byteBuf.readUnsignedIntLE();
long firstInt = byteBuf.readUnsignedIntLE();
long secondInt = byteBuf.readUnsignedIntLE();
long protocol = byteBuf.readUnsignedIntLE();
long flag = byteBuf.readUnsignedIntLE();
BaseModel model;
switch ((int) firstInt) {
switch ((int) protocol) {
case 0x00010100:
model = new LoginData();
break;
......@@ -51,8 +51,8 @@ public class BaseModel {
model.setDeviceId(deviceId);
model.setLength(length);
model.setProtocol(firstInt);
model.setFlag(secondInt);
model.setProtocol(protocol);
model.setFlag(flag);
if (length > 16) {
model.data = new byte[(int) (length - 16)];
byteBuf.readBytes(model.data);
......@@ -67,16 +67,16 @@ public class BaseModel {
/**
* 将 model 转换为 byteBuf
*
* @return byteBuf
*/
public void to(ByteBuf byteBuf) {
encodeData();
if (length == 0) {
length = 16 + (data == null ? 0 : data.length);
}
byteBuf.writeIntLE(toInt(deviceId));
byteBuf.writeIntLE(toInt(length));
byteBuf.writeIntLE(toInt(protocol));
byteBuf.writeIntLE(toInt(flag));
byteBuf.writeInt(toInt(protocol));
byteBuf.writeInt(toInt(flag));
if (data != null && data.length > 0) {
byteBuf.writeBytes(data);
}
......
......@@ -59,7 +59,6 @@ public class BehaviorModel extends BaseModel {
@Override
public void encodeData() {
this.deviceId = 1;
this.length = 16L + 448L + picture.length;
this.protocol = 0x00040006L;
this.flag = Application.REAL_TIME;
......
......@@ -31,7 +31,6 @@ public class FlowModel extends BaseModel {
@Override
public void encodeData() {
this.deviceId = 1;
this.length = 540L;
this.protocol = 0x00040005L;
this.flag = Application.REAL_TIME;
......
......@@ -7,12 +7,12 @@ package com.viontech.model;
* @date 2020/8/21
*/
public class KeepAlive extends BaseModel{
public class KeepAlive extends BaseModel {
@Override
public void encodeData() {
this.protocol = 0x0001FFFF;
}
@Override
......
package com.viontech.model;
import com.fasterxml.jackson.core.util.ByteArrayBuilder;
import com.sun.xml.internal.stream.util.BufferAllocator;
import com.viontech.Application;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
......@@ -98,7 +96,6 @@ public class TrafficModel extends BaseModel {
@Override
public void encodeData() {
this.deviceId = 1;
this.protocol = 0x00020004L;
this.length = 16L + 452L + picture1.length + picture2.length + 4;
this.flag = Application.REAL_TIME;
......
......@@ -4,6 +4,7 @@ import com.viontech.model.BaseModel;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
......@@ -13,13 +14,14 @@ import java.util.List;
* @author 谢明辉
* @date 2020/8/21
*/
@Slf4j
public class ByteToMessageCodecHandler extends ByteToMessageCodec<BaseModel> {
@Override
protected void encode(ChannelHandlerContext ctx, BaseModel msg, ByteBuf out) throws Exception {
msg.to(out);
log.info("发送消息-长度:[{}],协议:[{}]", msg.getLength(), Integer.toHexString((int)msg.getProtocol()));
}
@Override
......
......@@ -3,6 +3,7 @@ package com.viontech.netty;
import com.viontech.model.BaseModel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedHashMap;
......@@ -65,4 +66,8 @@ public class ChannelGroup {
}
}
}
public synchronized static void unRegistered(ChannelHandlerContext ctx) {
DEVICE_CHANNEL_MAP.entrySet().removeIf(next -> next.getValue().compareTo(ctx.channel()) == 0);
}
}
......@@ -62,5 +62,6 @@ public class NettyReceiverHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
log.info("断开连接:{}", ctx.channel().id().toString());
ChannelGroup.unRegistered(ctx);
}
}
......@@ -27,7 +27,7 @@ public interface Process {
ip = "null";
LOGGER.error("", e);
}
return "/video/" + yyyyMMdd + "/" + ip + "/" + deviceId + "/" + fileName;
return "video/" + yyyyMMdd + "/" + ip + "/" + deviceId + "/" + fileName;
}
static byte[] downloadFile(String url) {
......
......@@ -39,13 +39,17 @@ public class ProcessService {
return null;
}
switch (eventCate) {
//违法
case "traffic":
if ("vehicle".equals(eventType)) {
return trafficProcess.process(jsonObject);
} else if ("tflow".equals(eventType)) {
}
// 流量
else if ("tflow".equals(eventType)) {
return flowProcess.process(jsonObject);
}
return null;
//综治
case "behavior":
return behaviorProcess.process(jsonObject);
default:
......
......@@ -42,6 +42,9 @@ public class TrafficProcess implements Process {
JSONObject lane = eventData.getJSONObject("lane");
JSONObject location = eventData.getJSONObject("location");
JSONObject illegal = eventData.getJSONObject("illegal");
if (illegal != null && illegal.getInteger("state") == 0) {
return null;
}
JSONArray pics = jsonObject.getJSONArray("pics");
model.setSerialNum(IntUtils.next());
String deviceSerialnum = jsonObject.getString("vchan_refid");
......@@ -86,7 +89,11 @@ public class TrafficProcess implements Process {
// todo 下载录像
byte[] bytes = Process.downloadFile(src_url);
String filename = item.getString("ofilename");
String filePath = Process.getFileFTPPath(eventTime, deviceSerialnum, filename);
String filePath = "";
try {
filePath = Process.getFileFTPPath(eventTime, deviceSerialnum, filename);
} catch (Exception ignore) {
}
// ftpClientHelper.storeFile(filePath, bytes);
byte[] videoPathBytes = filePath.getBytes(StandardCharsets.UTF_8);
System.arraycopy(videoPathBytes, 0, model.getVideoPath(), 0, videoPathBytes.length);
......@@ -120,7 +127,13 @@ public class TrafficProcess implements Process {
model.setPicture1(pic1Bytes);
model.setFirstPictureSize(pic1Bytes.length);
String filename = pic1.getString("ofilename");
String fileFTPPath = Process.getFileFTPPath(eventTime, deviceSerialnum, filename);
String fileFTPPath = "";
try {
fileFTPPath = Process.getFileFTPPath(eventTime, deviceSerialnum, filename);
} catch (Exception ignore) {
}
ftpClientHelper.storeFile(fileFTPPath, pic1Bytes);
byte[] pic1UrlBytes = fileFTPPath.getBytes();
System.arraycopy(pic1UrlBytes, 0, model.getPicturePath(), 0, Math.min(128, pic1UrlBytes.length));
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!