diff options
-rw-r--r-- | session/crypto.py | 1 | ||||
-rw-r--r-- | session/omq.py | 0 | ||||
-rw-r--r-- | session/snode.py | 110 | ||||
-rw-r--r-- | session/storage/handlers/base.py | 18 | ||||
-rw-r--r-- | session/storage/handlers/storage.py | 55 |
5 files changed, 125 insertions, 59 deletions
diff --git a/session/crypto.py b/session/crypto.py index 77152c7..07b4c8f 100644 --- a/session/crypto.py +++ b/session/crypto.py @@ -7,3 +7,4 @@ def parse_pubkey(hexstr): return PubKey(hexstr, encoder=HexEncode) except: return None + diff --git a/session/omq.py b/session/omq.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/session/omq.py diff --git a/session/snode.py b/session/snode.py index 2c2f6b9..5fab53a 100644 --- a/session/snode.py +++ b/session/snode.py @@ -1,14 +1,102 @@ from binascii import hexlify, unhexlify -from session import base32z +from session import base32z, crypto, utils + +from nacl.signing import SigningKey +from nacl.encoding import HexEncoder + +import json + +class _OMQRPC: + """ + rpc caller for rpc method to oxend + """ + + def _call(self, method, **kwargs): + conn = omq.get_conn() + args = json.dumps(**kwargs) + conn.request(method, args.encode('ascii')) + + def _make_caller(self, method): + def call(**kwargs): + return self._call(method, *kwargs) + return call + + + def __getattribute__(self, name): + return self._make_caller(name) + + +class SNodeState: + + _caller = _OMQRPC() + + def __init__(self): + self._privkey_ed25519 = None + self._block_hash = None + self._snode_list = None + self._snode_update_listeners = list() + + def _update_snode_list(self, cur_block_hash): + """ update the service node list using a current block hash """ + if cur_block_hash: + self._block_hash = cur_block_hash + fields = dict() + for member in ["swarm_id", "storage_port", "public_ip", "pubkey_x25519", "pubkey_ed25519", "storage_lmq_port"]: + fields[member] = True + result = self._caller.get_n_service_nodes(active_only=True, poll_block_hash=self._block_hash, fields=fields) + if result is None or ('unchanged' in result['result'] and result['unchanged']): + # bail + return + snode_list = dict() + for state in result['service_node_states']: + snode_list[state['serivce_node_pubkey']] = SNodeInfo(state) + if snode_list.empty(): + # bail + return + self._snode_list = snode_list + for listener in self._snode_update_listeners: + listener() + + def add_snodelist_change_listener(self, hook): + self._snode_update_listeners.append(hook) + + def get_swarm_nodes_for_pk(self, pk): + """ given a hex pubkey get a dict of SNodeInfo mapping pubkey to snodeinfo for our swarm """ + if self._snode_list is None: + self._update_snode_list(self._block_hash) + if self._snode_list is None: + # bail + return + + + def get_our_pk(self): + """ + get our public key if we don't already have it + """ + if self._privkey_ed25519 is None: + result = self._caller.get_service_node_privkey() + if result is None or not utils.contains_members(result, "service_node_ed25519_privkey"): + # bail + return + self._privkey_ed25519 = SigningKey(result["service_node_ed25519_privkey"], encoding=HexEncoder) + return self._privkey_ed25519.verify_key.encode(encoding=HexEncoder) + + +_state = SNodeState() + +get_swarm_nodes_for_pk = _state.get_swarm_nodes_for_pk +get_our_pk = _state.get_our_pk class SNodeInfo: def __init__(self, jobj): - self._xkey_raw = unhexlify(jobj["x25519_pubkey"]) - self._edkey_raw = unhexlify(jobj["ed25519_pubkey"]) - self._port = int(jobj["port"]) - self._ip = jobj["ip"] + self._xkey_raw = unhexlify(jobj["pubkey_x25519"]) + self._edkey_raw = unhexlify(jobj["pubkey_ed25519"]) + self._storage_port = int(jobj["storage_port"]) + self._lmq_port = int(jobj["storage_lmq_port"]) + self._ip = jobj["public_ip"] + self._swarm_id = int(jobj["swarm_id"]) def snode_addr(self): """ return .snode address """ @@ -21,21 +109,21 @@ class SNodeInfo: def ed25519_pubkey(self): """ return ed25519 public key as hex""" return hexlify(self._edkey_raw) - - def port(self): - """ return port as integer """ - return self._port def ip_addr(self): """ return advertised ip addess """ return self._ip + + def swarm(self): + """ return our swarm id """ + return self._swarm_id def to_object(self): """ - convert into a serializable json object + convert into a serializable json object for clients """ return {"address": self.snode_addr(), "pubkey_x25519": self.x25519_pubkey(), "pubkey_ed25519": self.ed25519_pubkey(), - "port": str(self.port()), + "port": str(self._storage_port), "ip": self.ip_addr()} diff --git a/session/storage/handlers/base.py b/session/storage/handlers/base.py index 176dec4..5799e6b 100644 --- a/session/storage/handlers/base.py +++ b/session/storage/handlers/base.py @@ -1,7 +1,8 @@ -from flask import Response +from flask import request, Response from flask_restful import Resource import json +from session import utils def respond(status, body, content_type='plain'): if content_type == 'json': @@ -13,12 +14,21 @@ def respond(status, body, content_type='plain'): class BaseRPC(Resource): - def post(self, method, params): + def post(self): return respond(501, "not implemented") - def jsonrpc(self, method, params): + def jsonrpc(self): + try: + jobj = request.get_json(force=True) + except: + return respond(400, "not jsonrpc") + if not utils.contains_members(jobj, "method", "params"): + return respond(400, "invald jsonrpc request") + + handler = 'handle_{}'.format(method) if not hasattr(self, handler): return respond(400, "no such method: {}".format(method)) - return getattr(self, handler)(params) + return getattr(self, handler)(**params) + diff --git a/session/storage/handlers/storage.py b/session/storage/handlers/storage.py index 38a4038..7281cc3 100644 --- a/session/storage/handlers/storage.py +++ b/session/storage/handlers/storage.py @@ -1,41 +1,15 @@ -from session import crypto, snode, utils +from session import snode, utils from session.storage.handlers.base import BaseRPC, respond from session.message import swarm_message_router from flask_restful_swagger import swagger - - class StorageRPC(BaseRPC): @swagger.operation( notes="stores a message to the storage server backend", nickname="storage rpc", - parameters=[ - { - "name": "pubKey", - "description" : "public key of the recipiant", - "required": True, - "allowMultiple": False, - "dataType": "string" - }, - { - "name": "ttl", - "description" : "time to live of this message we are submitting, integer but inside a string :^)", - "required": True, - "allowMultiple": False, - "dataType": "string" - }, - { - "name": "timestamp", - "description" : "timestamp of this message, integer inside a string", - "required": True, - "allowMultiple": False, - "dataType": "string" - }, - - - ], + description="store a message", responseMessages=[ { "code": 200, @@ -59,10 +33,9 @@ class StorageRPC(BaseRPC): }, ] ) - def post(self, method, params): - return self.jsonrpc(method, params) + def post(self): + return self.jsonrpc() - MaxMessageBody = utils.MB(1) MaxTimeSkew = utils.seconds(10) @@ -77,27 +50,21 @@ class StorageRPC(BaseRPC): return util.within_range(ttl, self.MinTTL, self.MaxTTL) and utils.within_range(timestamp, now - ttl, now + self.MaxTimeSkew) - def handle_store(self, params): + def handle_store(self, pubKey, ttl, timestamp, data): """ store a message """ - if not utils.contains_members(params, 'pubKey', 'ttl', 'timestamp', 'data'): - return respond(400, "invalid message format") - if len(params['data']) > self.MaxMessageBody: + if len(data) > self.MaxMessageBody: return respond(400, "message is too big") - - timestamp = int(params['timestamp']) - ttl = int(params['ttl']) + + timestamp = int(timestamp) + ttl = int(ttl) if not self.timestamp_is_valid(timestamp, ttl): return respond(403, "message timestamp invalid") - - pk = crypto.parse_pubkey(params['pubKey']) - if pk is None: - abort(400) - snodes = snode.get_swarm_nodes_for_pk(pk) + snodes = snode.get_swarm_nodes_for_pk(pubKey) our_pk = snode.get_our_pk() if our_pk not in snodes: # we are in the wrong snode @@ -110,7 +77,7 @@ class StorageRPC(BaseRPC): # obtain a message router to store, fanout and notify about a messagep try: with swarm_message_router() as router: - router.got_message_from(pk, params['data']) + router.got_message_from(pubKey, data) except Exception as ex: return respond(500, "failed to handle message: {}".format(ex)) else: |