mirror of
git://xwords.git.sourceforge.net/gitroot/xwords/xwords
synced 2025-01-08 05:24:39 +01:00
rewrite of gcm_loop.py to use FCM
Works on my test VM. Untested on production.
This commit is contained in:
parent
0c52d4e550
commit
4bac49b76a
1 changed files with 302 additions and 0 deletions
302
xwords4/relay/scripts/fcm_loop.py
Normal file
302
xwords4/relay/scripts/fcm_loop.py
Normal file
|
@ -0,0 +1,302 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
# Meant to be run on the server that's hosting the relay, loops,
|
||||
# checking the relay for new messages whose target devices have GCM
|
||||
# ids and sending GCM notifications to them.
|
||||
#
|
||||
# Depends on the gcm module
|
||||
|
||||
import getpass, sys, psycopg2, time, signal, shelve, json, urllib2
|
||||
from time import gmtime, strftime
|
||||
from os import path
|
||||
from oauth2client.service_account import ServiceAccountCredentials
|
||||
|
||||
import mykey
|
||||
|
||||
FCM_URL = 'https://fcm.googleapis.com/v1/projects/fcmtest-9fe99/messages:send'
|
||||
SCOPES = ["https://www.googleapis.com/auth/firebase.messaging"]
|
||||
|
||||
def get_access_token():
|
||||
credentials = ServiceAccountCredentials.from_json_keyfile_name(
|
||||
'service-account.json', SCOPES)
|
||||
access_token_info = credentials.get_access_token()
|
||||
print 'token:', access_token_info.access_token
|
||||
return access_token_info.access_token
|
||||
|
||||
# g_accessToken = "abcd" # start with illegal value
|
||||
g_accessToken = get_access_token()
|
||||
|
||||
# Backoff strategy
|
||||
#
|
||||
# A message is considered in need of delivery as long as its in the
|
||||
# msgs table, and the expected behavior is that as soon as a device
|
||||
# receives a GCM notification it fetches all messages so that they're
|
||||
# deleted. But when a device is offline we don't get any errors, so
|
||||
# while a message remains in that table we need to be sure we don't
|
||||
# ask GCM to contact the device too often.
|
||||
#
|
||||
# But it's devices we contact, not messages. A device is in the
|
||||
# contact list if it is the target of at least one message in the msgs
|
||||
# table.
|
||||
|
||||
k_shelfFile = path.splitext( path.basename( sys.argv[0]) )[0] + ".shelf"
|
||||
k_SENT = 'SENT'
|
||||
g_useStime = True
|
||||
g_con = None
|
||||
g_sent = None
|
||||
g_debug = False
|
||||
g_skipSend = False # for debugging
|
||||
g_sendAll = False
|
||||
g_columns = [ 'id', 'devid', 'connname', 'hid', 'msg64' ]
|
||||
DEVTYPE_GCM = 3 # 3 == GCM
|
||||
DEVTYPE_FCM = 6 # FCM, from DUtilCtxt.java
|
||||
LINE_LEN = 76
|
||||
|
||||
def init():
|
||||
global g_sent
|
||||
try:
|
||||
user = getpass.getuser()
|
||||
print 'user:', user
|
||||
port = mykey.psqlPort
|
||||
con = psycopg2.connect(port=port, database='xwgames', user=user)
|
||||
except psycopg2.DatabaseError, e:
|
||||
print 'Error %s' % e
|
||||
sys.exit(1)
|
||||
|
||||
shelf = shelve.open( k_shelfFile )
|
||||
if k_SENT in shelf: g_sent = shelf[k_SENT]
|
||||
else: g_sent = {}
|
||||
shelf.close();
|
||||
if g_debug: print 'init(): g_sent:', g_sent
|
||||
|
||||
return con
|
||||
|
||||
# WHERE stime IS NULL
|
||||
|
||||
def getPendingMsgs( con, typ ):
|
||||
cur = con.cursor()
|
||||
query = "SELECT %s FROM msgs WHERE "
|
||||
if g_useStime:
|
||||
query += " stime = 'epoch' AND "
|
||||
query += """ devid IN (SELECT id FROM devices WHERE devtypes[1]=%d and NOT unreg)
|
||||
AND (connname IS NULL OR NOT connname IN (SELECT connname FROM games WHERE dead));"""
|
||||
cur.execute(query % (",".join( g_columns ), typ))
|
||||
|
||||
result = []
|
||||
for row in cur:
|
||||
rowObj = {}
|
||||
for ii in range( len( g_columns ) ):
|
||||
rowObj[g_columns[ii]] = row[ii]
|
||||
result.append( rowObj )
|
||||
if g_debug: print "getPendingMsgs=>", result
|
||||
return result
|
||||
|
||||
def addClntVers( con, rows ):
|
||||
query = """select clntVers from devices where id = %d;"""
|
||||
cur = con.cursor()
|
||||
for row in rows:
|
||||
cur.execute( query % (row['devid']) )
|
||||
if cur.rowcount == 1: row['clntVers'] = cur.fetchone()[0]
|
||||
else: print "bad row count: ", cur.rowcount
|
||||
con.commit()
|
||||
return rows
|
||||
|
||||
def deleteMsgs( con, msgIDs ):
|
||||
if 0 < len( msgIDs ):
|
||||
if g_useStime:
|
||||
query = "UPDATE msgs SET stime = 'now' where id in (%s);" % ",".join(msgIDs)
|
||||
else:
|
||||
query = "DELETE from msgs where id in (%s);" % ",".join(msgIDs)
|
||||
try:
|
||||
cur = con.cursor()
|
||||
cur.execute(query)
|
||||
con.commit()
|
||||
except psycopg2.DatabaseError, e:
|
||||
print 'Error %s' % e
|
||||
except Exception as inst:
|
||||
print "failed to execute", query
|
||||
print type(inst)
|
||||
print inst.args
|
||||
print inst
|
||||
|
||||
def unregister( gcmid ):
|
||||
global g_con
|
||||
print "unregister(", gcmid, ")"
|
||||
query = "UPDATE devices SET unreg=TRUE WHERE devids[1] = '%s' and devtypes[1] = 3" % gcmid
|
||||
g_con.cursor().execute( query )
|
||||
g_con.commit()
|
||||
|
||||
def asGCMIds(con, devids, typ):
|
||||
cur = con.cursor()
|
||||
query = "SELECT devids[1] FROM devices WHERE devtypes[1] = %d AND id IN (%s)" \
|
||||
% (typ, ",".join([str(y) for y in devids]))
|
||||
cur.execute( query )
|
||||
result = [elem[0] for elem in cur.fetchall()]
|
||||
if g_debug: print 'asGCMIds() =>', result
|
||||
return result
|
||||
|
||||
def notifyViaFCM( devids, typ, target ):
|
||||
global g_accessToken
|
||||
success = False
|
||||
if typ == DEVTYPE_FCM:
|
||||
if 'clntVers' in target and 3 <= target['clntVers'] and target['msg64']:
|
||||
# data = { 'msgs64': str([target['msg64']]) },
|
||||
data = { 'msgs64': json.dumps([target['msg64']]) }
|
||||
if target['connname'] and target['hid']:
|
||||
data['connname'] = "%s/%d" % (target['connname'], target['hid'])
|
||||
else:
|
||||
data = { 'getMoves': True, }
|
||||
values = {
|
||||
'message' : {
|
||||
'token' : devids[0],
|
||||
'data' : data,
|
||||
}
|
||||
}
|
||||
params = json.dumps( values )
|
||||
|
||||
if g_skipSend:
|
||||
print
|
||||
print "not sending:", params
|
||||
else:
|
||||
for ignore in [True, True]: # try twice at most
|
||||
req = urllib2.Request( FCM_URL, params )
|
||||
req.add_header( 'Authorization', 'Bearer ' + g_accessToken )
|
||||
req.add_header( 'Content-Type', 'application/json' )
|
||||
try:
|
||||
response = urllib2.urlopen( req ).read()
|
||||
asJson = json.loads( response )
|
||||
|
||||
if 'success' in asJson and 'failure' in asJson and len(devids) == asJson['success'] \
|
||||
and 0 == asJson['failure']:
|
||||
print "OK; no failures"
|
||||
success = True
|
||||
else:
|
||||
print "Errors: "
|
||||
print response
|
||||
break
|
||||
|
||||
except urllib2.URLError as e:
|
||||
print 'error from urlopen:', e.reason
|
||||
if e.reason == 'Unauthorized':
|
||||
g_accessToken = get_access_token()
|
||||
else:
|
||||
break
|
||||
|
||||
else:
|
||||
print "not sending to", len(devids), "devices because typ ==", typ
|
||||
return success
|
||||
|
||||
def shouldSend(val):
|
||||
return g_sendAll or val == 1
|
||||
# pow = 1
|
||||
# while pow < val:
|
||||
# pow *= 3
|
||||
# return pow == val
|
||||
|
||||
# given a list of msgid, devid lists, figure out which messages should
|
||||
# be sent/resent now and mark them as sent. Backoff is based on
|
||||
# msgids: if the only messages a device has pending have been seen
|
||||
# before, backoff applies.
|
||||
def targetsAfterBackoff( msgs ):
|
||||
global g_sent
|
||||
targets = {}
|
||||
for row in msgs:
|
||||
msgid = row['id']
|
||||
devid = row['devid']
|
||||
if not msgid in g_sent:
|
||||
g_sent[msgid] = 0
|
||||
g_sent[msgid] += 1
|
||||
if shouldSend( g_sent[msgid] ):
|
||||
targets[devid] = row
|
||||
return targets
|
||||
|
||||
# devids is an array of (msgid, devid) tuples
|
||||
def pruneSent( devids ):
|
||||
global g_sent
|
||||
if g_debug: print "pruneSent: before:", g_sent
|
||||
lenBefore = len(g_sent)
|
||||
msgids = []
|
||||
for row in devids:
|
||||
msgids.append(row['id'])
|
||||
for msgid in g_sent.keys():
|
||||
if not msgid in msgids:
|
||||
del g_sent[msgid]
|
||||
if g_debug: print "pruneSent: after:", g_sent
|
||||
|
||||
def cleanup():
|
||||
global g_con, g_sent
|
||||
if g_con:
|
||||
g_con.close()
|
||||
g_con = None
|
||||
shelf = shelve.open( k_shelfFile )
|
||||
shelf[k_SENT] = g_sent
|
||||
shelf.close();
|
||||
|
||||
def handleSigTERM( one, two ):
|
||||
print 'handleSigTERM called: ', one, two
|
||||
cleanup()
|
||||
|
||||
def usage():
|
||||
print "usage:", sys.argv[0], "[--loop <nSeconds>] [--type typ] [--verbose]"
|
||||
sys.exit();
|
||||
|
||||
def main():
|
||||
global g_con, g_sent, g_debug
|
||||
loopInterval = 0
|
||||
g_con = init()
|
||||
emptyCount = 0
|
||||
typ = DEVTYPE_FCM
|
||||
|
||||
ii = 1
|
||||
while ii < len(sys.argv):
|
||||
arg = sys.argv[ii]
|
||||
if arg == '--loop':
|
||||
ii += 1
|
||||
loopInterval = float(sys.argv[ii])
|
||||
elif arg == '--type':
|
||||
ii += 1
|
||||
typ = int(sys.argv[ii])
|
||||
elif arg == '--verbose':
|
||||
g_debug = True
|
||||
else:
|
||||
usage()
|
||||
ii = ii + 1
|
||||
|
||||
signal.signal( signal.SIGTERM, handleSigTERM )
|
||||
signal.signal( signal.SIGINT, handleSigTERM )
|
||||
|
||||
while g_con:
|
||||
if g_debug: print
|
||||
devids = getPendingMsgs( g_con, typ )
|
||||
if 0 < len(devids):
|
||||
devids = addClntVers( g_con, devids )
|
||||
targets = targetsAfterBackoff( devids )
|
||||
if 0 < len(targets):
|
||||
if 0 < emptyCount: print ""
|
||||
emptyCount = 0
|
||||
print strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
|
||||
if g_debug: print "devices needing notification:", targets, '=>',
|
||||
toDelete = []
|
||||
for devid in targets.keys():
|
||||
target = targets[devid]
|
||||
if notifyViaFCM( asGCMIds(g_con, [devid], typ), typ, target ) \
|
||||
and 3 <= target['clntVers'] \
|
||||
and target['msg64']:
|
||||
toDelete.append( str(target['id']) )
|
||||
pruneSent( devids )
|
||||
deleteMsgs( g_con, toDelete )
|
||||
elif g_debug: print "no targets after backoff"
|
||||
else:
|
||||
emptyCount += 1
|
||||
if (0 == (emptyCount%5)) and not g_debug:
|
||||
sys.stdout.write('.')
|
||||
sys.stdout.flush()
|
||||
if 0 == (emptyCount % (LINE_LEN*5)): print ""
|
||||
if 0 == loopInterval: break
|
||||
time.sleep( loopInterval )
|
||||
|
||||
cleanup()
|
||||
|
||||
##############################################################################
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Reference in a new issue