Commit 212e4a66 by 熊付

【ws】

1 parent 12c715c6
...@@ -23,9 +23,9 @@ def dataRecv(): ...@@ -23,9 +23,9 @@ def dataRecv():
data = request.get_data() data = request.get_data()
dataDic = json.loads(data) dataDic = json.loads(data)
if type(dataDic) is dict: if type(dataDic) is dict:
send_all_data ( sv_global.get_value (), data )
# res = etl.eventToLocal.saveEventToLocal(dataDic) # res = etl.eventToLocal.saveEventToLocal(dataDic)
# print(res) # print(res)
send_all_data(sv_global.get_value(),data)
#存入本地数据库 #存入本地数据库
# save_alarm_data(data,res) # save_alarm_data(data,res)
else: else:
......
...@@ -29,7 +29,7 @@ class Server (): ...@@ -29,7 +29,7 @@ class Server ():
self.sock.bind ( (h,p) ) # 监听端口 self.sock.bind ( (h,p) ) # 监听端口
#self.sock.bind ( (config['HOST'], config['PORT']) ) # 监听端口 #self.sock.bind ( (config['HOST'], config['PORT']) ) # 监听端口
self.sock.listen ( config['LISTEN_CLIENT'] ) # 监听客户端数量 self.sock.listen ( config['LISTEN_CLIENT'] ) # 监听客户端数量
self.sock.setsockopt(SOL_SOCKET, SO_LINGER, 1)
# 所有监听的客户端 # 所有监听的客户端
self.clients = {} self.clients = {}
self.thrs = {} self.thrs = {}
...@@ -38,10 +38,11 @@ class Server (): ...@@ -38,10 +38,11 @@ class Server ():
self.h=h self.h=h
self.p=p self.p=p
self.subtask_id=None self.subtask_id=None
self.subtaskids={}
# 监听客户端连接 # 监听客户端连接
def listen_client(self): def listen_client(self):
while 1: while 1:
# time.sleep(500) time.sleep(0.001)
# 循环监听 # 循环监听
tcpClientSock, addr = self.sock.accept () tcpClientSock, addr = self.sock.accept ()
address = addr[0] + ':' + str ( addr[1] ) # ip:port address = addr[0] + ':' + str ( addr[1] ) # ip:port
...@@ -83,14 +84,27 @@ class Server (): ...@@ -83,14 +84,27 @@ class Server ():
# config['PORT'] ) ) # config['PORT'] ) )
str_handshake = config['HANDSHAKE_STRING'].replace ( '{1}', res_key ).replace ( '{2}',self.h+ ':' + str (self.p) ) str_handshake = config['HANDSHAKE_STRING'].replace ( '{1}', res_key ).replace ( '{2}',self.h+ ':' + str (self.p) )
tcpClientSock.send ( str_handshake ) tcpClientSock.send ( str_handshake )
num = len ( self.clients )
if num > 0:
for i in range ( num ):
if len ( self.clients ) > 0:
for adr_pot in self.clients:
_index = adr_pot.index ( ':' )
adr = adr_pot[0:_index]
if addr[0] == adr:
self.close_client ( adr_pot )
break
# 握手成功 分配线程进行监听 # 握手成功 分配线程进行监听
print(address + '进来了') print(address + '进来了')
#index = address.index(':')
#address = address[0:index]
#client = self.clients.pop(address)
#if client :
# client.close()
self.clients[address] = tcpClientSock self.clients[address] = tcpClientSock
self.thrs[address] = threading.Thread ( target=self.readMsg, args=[address] ) self.thrs[address] = threading.Thread ( target=self.readMsg, args=[address] )
self.thrs[address].start () self.thrs[address].start ()
time.sleep ( 0.001 ) #time.sleep(0.001)
# print(self.clients) # print(self.clients)
def readMsg(self, address): def readMsg(self, address):
...@@ -103,6 +117,7 @@ class Server (): ...@@ -103,6 +117,7 @@ class Server ():
import select import select
time_out = 0 time_out = 0
while 1: while 1:
time.sleep ( 0.001 )
# print(len(self.clients)) # print(len(self.clients))
if address in self.stops: if address in self.stops:
self.close_client ( address ) self.close_client ( address )
...@@ -150,7 +165,7 @@ class Server (): ...@@ -150,7 +165,7 @@ class Server ():
raw_str += chr ( ord ( d ) ^ ord ( masks[i % 4] ) ) raw_str += chr ( ord ( d ) ^ ord ( masks[i % 4] ) )
# print(raw_str) # print(raw_str)
i += 1 i += 1
print raw_str
# 获取到输入的数据 向所有的客户端发送 # 获取到输入的数据 向所有的客户端发送
# 开启线程记录 # 开启线程记录
...@@ -161,30 +176,43 @@ class Server (): ...@@ -161,30 +176,43 @@ class Server ():
if type(da) is dict: if type(da) is dict:
if da.has_key('subtask_id'): if da.has_key('subtask_id'):
self.subtask_id = da['subtask_id'] self.subtask_id = da['subtask_id']
self.subtaskids[address]=da['subtask_id']
except Exception as e: except Exception as e:
print e print e
time.sleep(0.001) time.sleep(0.001)
#t1 = threading.Thread ( target=self.send_data, args=[raw_str, address] )
#t1.start ()
def send_data(self, str_data, address): def send_data(self, data, address):
#data = eval(data) subtask_id=None
if type(str_data) is str: try:
data = json.loads (str_data) if type(data) is str:
if type(data) is dict: dict_data = json.loads (data)
if data.has_key('subtask_id'): if type(dict_data) is dict:
subtask_id = data['subtask_id'] if dict_data.has_key('subtask_id'):
if self.subtask_id and self.subtask_id == subtask_id: subtask_id = dict_data['subtask_id']
#if self.subtask_id is None or self.subtask_id != subtask_id:
# subtask_id = None
except:
subtask_id=None
try:
current_id=self.subtaskids[address]
except:
current_id=None
if subtask_id is None or current_id is None or current_id != subtask_id:
return
import struct import struct
from urllib import unquote from urllib import unquote
try: try:
username = unquote ( self.users[address] ) username = unquote ( self.users[address] )
except: except:
username = '匿名用户' username = '匿名用户'
# if data: #if data:
# data = str ( '【' + username + '说】' + data ) # data = str ( '【' + username + '说】' + data )
# else: #else:
# return False # return False
token = "\x81" token = "\x81"
length = len ( str_data ) length = len ( data )
if length < 126: if length < 126:
print '126' print '126'
token += struct.pack ( "B", length ) token += struct.pack ( "B", length )
...@@ -197,10 +225,10 @@ class Server (): ...@@ -197,10 +225,10 @@ class Server ():
print 'token=%s' % token print 'token=%s' % token
# struct为Python中处理二进制数的模块,二进制流为C,或网络流的形式。 # struct为Python中处理二进制数的模块,二进制流为C,或网络流的形式。
str_data = '%s%s' % (token, str_data) data = '%s%s' % (token, data)
print str_data #print data
try: try:
# for key, val in self.clients.iteritems (): #for key, val in self.clients.iteritems ():
# print # print
# client = val # client = val
# try: # try:
...@@ -209,22 +237,20 @@ class Server (): ...@@ -209,22 +237,20 @@ class Server ():
# except: # except:
# self.close_client ( key ) # self.close_client ( key )
client = self.clients[address] client = self.clients[address]
client.send ( str_data ) client.send ( data )
except: except:
pass pass
else:
pass
else:
pass
else:
pass
def close_client(self, address): def close_client(self, address):
try: try:
client = self.clients.pop ( address ) client = self.clients.pop ( address )
self.stops.append ( address ) self.stops.append ( address )
client.close () client.close ()
del self.users[address] del self.users[address]
thr = self.thrs.pop ( address )
thr.join ()
self.subtaskids.pop(address)
except: except:
pass pass
......
No preview for this file type
...@@ -57,7 +57,7 @@ def init(): ...@@ -57,7 +57,7 @@ def init():
database._init() database._init()
dict_p._init() dict_p._init()
dict_p.set_value(dictProperties) dict_p.set_value(dictProperties)
con = fdb.connect ( host='192.168.9.239', database='/var/lib/firebird/2.5/data/little_star.fdb', user='VION_FDB',password='vion',charset='UTF8' ) con = fdb.connect ( host='192.168.88.189', database='/var/lib/firebird/2.5/data/little_star.fdb', user='SYSDBA',password='9512d49b',charset='UTF8' )
cur = con.cursor () cur = con.cursor ()
print (con) print (con)
database.set_value(con) database.set_value(con)
...@@ -84,7 +84,7 @@ def init(): ...@@ -84,7 +84,7 @@ def init():
if __name__=='__main__': if __name__=='__main__':
dict_p._init () dict_p._init ()
dict_p.set_value ( dictProperties ) dict_p.set_value ( dictProperties )
init() #init()
_h=dictProperties['tx1_host'] _h=dictProperties['tx1_host']
_p=int(dictProperties['tx1_ws_port']) _p=int(dictProperties['tx1_ws_port'])
etl.eventToLocal.setRootPath(dictProperties['save_root_path']) etl.eventToLocal.setRootPath(dictProperties['save_root_path'])
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!