#!/usr/bin/python # Meant to be run on the server that's hosting the relay, loops, # checking the relay for new messages whose target devices have GCM # ids and sending GCM notifications to them. # # Depends on the gcm module import getpass, sys, psycopg2, time, signal, shelve, json, urllib2 from time import gmtime, strftime from os import path from oauth2client.service_account import ServiceAccountCredentials import mykey FCM_URL = 'https://fcm.googleapis.com/v1/projects/fcmtest-9fe99/messages:send' SCOPES = ["https://www.googleapis.com/auth/firebase.messaging"] def get_access_token(): credentials = ServiceAccountCredentials.from_json_keyfile_name( 'service-account.json', SCOPES) access_token_info = credentials.get_access_token() print 'token:', access_token_info.access_token return access_token_info.access_token # g_accessToken = "abcd" # start with illegal value g_accessToken = get_access_token() # Backoff strategy # # A message is considered in need of delivery as long as its in the # msgs table, and the expected behavior is that as soon as a device # receives a GCM notification it fetches all messages so that they're # deleted. But when a device is offline we don't get any errors, so # while a message remains in that table we need to be sure we don't # ask GCM to contact the device too often. # # But it's devices we contact, not messages. A device is in the # contact list if it is the target of at least one message in the msgs # table. k_shelfFile = path.splitext( path.basename( sys.argv[0]) )[0] + ".shelf" k_SENT = 'SENT' g_useStime = True g_con = None g_sent = None g_debug = False g_skipSend = False # for debugging g_sendAll = False g_columns = [ 'id', 'devid', 'connname', 'hid', 'msg64' ] DEVTYPE_GCM = 3 # 3 == GCM DEVTYPE_FCM = 6 # FCM, from DUtilCtxt.java LINE_LEN = 76 def init(): global g_sent try: user = getpass.getuser() print 'user:', user port = mykey.psqlPort con = psycopg2.connect(port=port, database='xwgames', user=user) except psycopg2.DatabaseError, e: print 'Error %s' % e sys.exit(1) shelf = shelve.open( k_shelfFile ) if k_SENT in shelf: g_sent = shelf[k_SENT] else: g_sent = {} shelf.close(); if g_debug: print 'init(): g_sent:', g_sent return con # WHERE stime IS NULL def getPendingMsgs( con, typ ): cur = con.cursor() query = "SELECT %s FROM msgs WHERE " if g_useStime: query += " stime = 'epoch' AND " query += """ devid IN (SELECT id FROM devices WHERE devtypes[1]=%d and NOT unreg) AND (connname IS NULL OR NOT connname IN (SELECT connname FROM games WHERE dead));""" cur.execute(query % (",".join( g_columns ), typ)) result = [] for row in cur: rowObj = {} for ii in range( len( g_columns ) ): rowObj[g_columns[ii]] = row[ii] result.append( rowObj ) if g_debug: print "getPendingMsgs=>", result return result def addClntVers( con, rows ): query = """select clntVers from devices where id = %d;""" cur = con.cursor() for row in rows: cur.execute( query % (row['devid']) ) if cur.rowcount == 1: row['clntVers'] = cur.fetchone()[0] else: print "bad row count: ", cur.rowcount con.commit() return rows def deleteMsgs( con, msgIDs ): if 0 < len( msgIDs ): if g_useStime: query = "UPDATE msgs SET stime = 'now' where id in (%s);" % ",".join(msgIDs) else: query = "DELETE from msgs where id in (%s);" % ",".join(msgIDs) try: cur = con.cursor() cur.execute(query) con.commit() except psycopg2.DatabaseError, e: print 'Error %s' % e except Exception as inst: print "failed to execute", query print type(inst) print inst.args print inst def unregister( gcmid ): global g_con print "unregister(", gcmid, ")" query = "UPDATE devices SET unreg=TRUE WHERE devids[1] = '%s' and devtypes[1] = 3" % gcmid g_con.cursor().execute( query ) g_con.commit() def asGCMIds(con, devids, typ): cur = con.cursor() query = "SELECT devids[1] FROM devices WHERE devtypes[1] = %d AND id IN (%s)" \ % (typ, ",".join([str(y) for y in devids])) cur.execute( query ) result = [elem[0] for elem in cur.fetchall()] if g_debug: print 'asGCMIds() =>', result return result def notifyViaFCM( devids, typ, target ): global g_accessToken success = False if typ == DEVTYPE_FCM: if 'clntVers' in target and 3 <= target['clntVers'] and target['msg64']: # data = { 'msgs64': str([target['msg64']]) }, data = { 'msgs64': json.dumps([target['msg64']]) } if target['connname'] and target['hid']: data['connname'] = "%s/%d" % (target['connname'], target['hid']) else: data = { 'getMoves': True, } values = { 'message' : { 'token' : devids[0], 'data' : data, } } params = json.dumps( values ) if g_skipSend: print print "not sending:", params else: for ignore in [True, True]: # try twice at most req = urllib2.Request( FCM_URL, params ) req.add_header( 'Authorization', 'Bearer ' + g_accessToken ) req.add_header( 'Content-Type', 'application/json' ) try: response = urllib2.urlopen( req ).read() asJson = json.loads( response ) if 'success' in asJson and 'failure' in asJson and len(devids) == asJson['success'] \ and 0 == asJson['failure']: print "OK; no failures" success = True else: print "Errors: " print response break except urllib2.URLError as e: print 'error from urlopen:', e.reason if e.reason == 'Unauthorized': g_accessToken = get_access_token() else: break else: print "not sending to", len(devids), "devices because typ ==", typ return success def shouldSend(val): return g_sendAll or val == 1 # pow = 1 # while pow < val: # pow *= 3 # return pow == val # given a list of msgid, devid lists, figure out which messages should # be sent/resent now and mark them as sent. Backoff is based on # msgids: if the only messages a device has pending have been seen # before, backoff applies. def targetsAfterBackoff( msgs ): global g_sent targets = {} for row in msgs: msgid = row['id'] devid = row['devid'] if not msgid in g_sent: g_sent[msgid] = 0 g_sent[msgid] += 1 if shouldSend( g_sent[msgid] ): targets[devid] = row return targets # devids is an array of (msgid, devid) tuples def pruneSent( devids ): global g_sent if g_debug: print "pruneSent: before:", g_sent lenBefore = len(g_sent) msgids = [] for row in devids: msgids.append(row['id']) for msgid in g_sent.keys(): if not msgid in msgids: del g_sent[msgid] if g_debug: print "pruneSent: after:", g_sent def cleanup(): global g_con, g_sent if g_con: g_con.close() g_con = None shelf = shelve.open( k_shelfFile ) shelf[k_SENT] = g_sent shelf.close(); def handleSigTERM( one, two ): print 'handleSigTERM called: ', one, two cleanup() def usage(): print "usage:", sys.argv[0], "[--loop ] [--type typ] [--verbose]" sys.exit(); def main(): global g_con, g_sent, g_debug loopInterval = 0 g_con = init() emptyCount = 0 typ = DEVTYPE_FCM ii = 1 while ii < len(sys.argv): arg = sys.argv[ii] if arg == '--loop': ii += 1 loopInterval = float(sys.argv[ii]) elif arg == '--type': ii += 1 typ = int(sys.argv[ii]) elif arg == '--verbose': g_debug = True else: usage() ii = ii + 1 signal.signal( signal.SIGTERM, handleSigTERM ) signal.signal( signal.SIGINT, handleSigTERM ) while g_con: if g_debug: print devids = getPendingMsgs( g_con, typ ) if 0 < len(devids): devids = addClntVers( g_con, devids ) targets = targetsAfterBackoff( devids ) if 0 < len(targets): if 0 < emptyCount: print "" emptyCount = 0 print strftime("%Y-%m-%d %H:%M:%S", time.localtime()), if g_debug: print "devices needing notification:", targets, '=>', toDelete = [] for devid in targets.keys(): target = targets[devid] if notifyViaFCM( asGCMIds(g_con, [devid], typ), typ, target ) \ and 3 <= target['clntVers'] \ and target['msg64']: toDelete.append( str(target['id']) ) pruneSent( devids ) deleteMsgs( g_con, toDelete ) elif g_debug: print "no targets after backoff" else: emptyCount += 1 if (0 == (emptyCount%5)) and not g_debug: sys.stdout.write('.') sys.stdout.flush() if 0 == (emptyCount % (LINE_LEN*5)): print "" if 0 == loopInterval: break time.sleep( loopInterval ) cleanup() ############################################################################## if __name__ == '__main__': main()