summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--session/crypto.py1
-rw-r--r--session/omq.py0
-rw-r--r--session/snode.py110
-rw-r--r--session/storage/handlers/base.py18
-rw-r--r--session/storage/handlers/storage.py55
5 files changed, 125 insertions, 59 deletions
diff --git a/session/crypto.py b/session/crypto.py
index 77152c7..07b4c8f 100644
--- a/session/crypto.py
+++ b/session/crypto.py
@@ -7,3 +7,4 @@ def parse_pubkey(hexstr):
return PubKey(hexstr, encoder=HexEncode)
except:
return None
+
diff --git a/session/omq.py b/session/omq.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/session/omq.py
diff --git a/session/snode.py b/session/snode.py
index 2c2f6b9..5fab53a 100644
--- a/session/snode.py
+++ b/session/snode.py
@@ -1,14 +1,102 @@
from binascii import hexlify, unhexlify
-from session import base32z
+from session import base32z, crypto, utils
+
+from nacl.signing import SigningKey
+from nacl.encoding import HexEncoder
+
+import json
+
+class _OMQRPC:
+ """
+ rpc caller for rpc method to oxend
+ """
+
+ def _call(self, method, **kwargs):
+ conn = omq.get_conn()
+ args = json.dumps(**kwargs)
+ conn.request(method, args.encode('ascii'))
+
+ def _make_caller(self, method):
+ def call(**kwargs):
+ return self._call(method, *kwargs)
+ return call
+
+
+ def __getattribute__(self, name):
+ return self._make_caller(name)
+
+
+class SNodeState:
+
+ _caller = _OMQRPC()
+
+ def __init__(self):
+ self._privkey_ed25519 = None
+ self._block_hash = None
+ self._snode_list = None
+ self._snode_update_listeners = list()
+
+ def _update_snode_list(self, cur_block_hash):
+ """ update the service node list using a current block hash """
+ if cur_block_hash:
+ self._block_hash = cur_block_hash
+ fields = dict()
+ for member in ["swarm_id", "storage_port", "public_ip", "pubkey_x25519", "pubkey_ed25519", "storage_lmq_port"]:
+ fields[member] = True
+ result = self._caller.get_n_service_nodes(active_only=True, poll_block_hash=self._block_hash, fields=fields)
+ if result is None or ('unchanged' in result['result'] and result['unchanged']):
+ # bail
+ return
+ snode_list = dict()
+ for state in result['service_node_states']:
+ snode_list[state['serivce_node_pubkey']] = SNodeInfo(state)
+ if snode_list.empty():
+ # bail
+ return
+ self._snode_list = snode_list
+ for listener in self._snode_update_listeners:
+ listener()
+
+ def add_snodelist_change_listener(self, hook):
+ self._snode_update_listeners.append(hook)
+
+ def get_swarm_nodes_for_pk(self, pk):
+ """ given a hex pubkey get a dict of SNodeInfo mapping pubkey to snodeinfo for our swarm """
+ if self._snode_list is None:
+ self._update_snode_list(self._block_hash)
+ if self._snode_list is None:
+ # bail
+ return
+
+
+ def get_our_pk(self):
+ """
+ get our public key if we don't already have it
+ """
+ if self._privkey_ed25519 is None:
+ result = self._caller.get_service_node_privkey()
+ if result is None or not utils.contains_members(result, "service_node_ed25519_privkey"):
+ # bail
+ return
+ self._privkey_ed25519 = SigningKey(result["service_node_ed25519_privkey"], encoding=HexEncoder)
+ return self._privkey_ed25519.verify_key.encode(encoding=HexEncoder)
+
+
+_state = SNodeState()
+
+get_swarm_nodes_for_pk = _state.get_swarm_nodes_for_pk
+get_our_pk = _state.get_our_pk
class SNodeInfo:
def __init__(self, jobj):
- self._xkey_raw = unhexlify(jobj["x25519_pubkey"])
- self._edkey_raw = unhexlify(jobj["ed25519_pubkey"])
- self._port = int(jobj["port"])
- self._ip = jobj["ip"]
+ self._xkey_raw = unhexlify(jobj["pubkey_x25519"])
+ self._edkey_raw = unhexlify(jobj["pubkey_ed25519"])
+ self._storage_port = int(jobj["storage_port"])
+ self._lmq_port = int(jobj["storage_lmq_port"])
+ self._ip = jobj["public_ip"]
+ self._swarm_id = int(jobj["swarm_id"])
def snode_addr(self):
""" return .snode address """
@@ -21,21 +109,21 @@ class SNodeInfo:
def ed25519_pubkey(self):
""" return ed25519 public key as hex"""
return hexlify(self._edkey_raw)
-
- def port(self):
- """ return port as integer """
- return self._port
def ip_addr(self):
""" return advertised ip addess """
return self._ip
+
+ def swarm(self):
+ """ return our swarm id """
+ return self._swarm_id
def to_object(self):
"""
- convert into a serializable json object
+ convert into a serializable json object for clients
"""
return {"address": self.snode_addr(),
"pubkey_x25519": self.x25519_pubkey(),
"pubkey_ed25519": self.ed25519_pubkey(),
- "port": str(self.port()),
+ "port": str(self._storage_port),
"ip": self.ip_addr()}
diff --git a/session/storage/handlers/base.py b/session/storage/handlers/base.py
index 176dec4..5799e6b 100644
--- a/session/storage/handlers/base.py
+++ b/session/storage/handlers/base.py
@@ -1,7 +1,8 @@
-from flask import Response
+from flask import request, Response
from flask_restful import Resource
import json
+from session import utils
def respond(status, body, content_type='plain'):
if content_type == 'json':
@@ -13,12 +14,21 @@ def respond(status, body, content_type='plain'):
class BaseRPC(Resource):
- def post(self, method, params):
+ def post(self):
return respond(501, "not implemented")
- def jsonrpc(self, method, params):
+ def jsonrpc(self):
+ try:
+ jobj = request.get_json(force=True)
+ except:
+ return respond(400, "not jsonrpc")
+ if not utils.contains_members(jobj, "method", "params"):
+ return respond(400, "invald jsonrpc request")
+
+
handler = 'handle_{}'.format(method)
if not hasattr(self, handler):
return respond(400, "no such method: {}".format(method))
- return getattr(self, handler)(params)
+ return getattr(self, handler)(**params)
+
diff --git a/session/storage/handlers/storage.py b/session/storage/handlers/storage.py
index 38a4038..7281cc3 100644
--- a/session/storage/handlers/storage.py
+++ b/session/storage/handlers/storage.py
@@ -1,41 +1,15 @@
-from session import crypto, snode, utils
+from session import snode, utils
from session.storage.handlers.base import BaseRPC, respond
from session.message import swarm_message_router
from flask_restful_swagger import swagger
-
-
class StorageRPC(BaseRPC):
@swagger.operation(
notes="stores a message to the storage server backend",
nickname="storage rpc",
- parameters=[
- {
- "name": "pubKey",
- "description" : "public key of the recipiant",
- "required": True,
- "allowMultiple": False,
- "dataType": "string"
- },
- {
- "name": "ttl",
- "description" : "time to live of this message we are submitting, integer but inside a string :^)",
- "required": True,
- "allowMultiple": False,
- "dataType": "string"
- },
- {
- "name": "timestamp",
- "description" : "timestamp of this message, integer inside a string",
- "required": True,
- "allowMultiple": False,
- "dataType": "string"
- },
-
-
- ],
+ description="store a message",
responseMessages=[
{
"code": 200,
@@ -59,10 +33,9 @@ class StorageRPC(BaseRPC):
},
]
)
- def post(self, method, params):
- return self.jsonrpc(method, params)
+ def post(self):
+ return self.jsonrpc()
-
MaxMessageBody = utils.MB(1)
MaxTimeSkew = utils.seconds(10)
@@ -77,27 +50,21 @@ class StorageRPC(BaseRPC):
return util.within_range(ttl, self.MinTTL, self.MaxTTL) and utils.within_range(timestamp, now - ttl, now + self.MaxTimeSkew)
- def handle_store(self, params):
+ def handle_store(self, pubKey, ttl, timestamp, data):
"""
store a message
"""
- if not utils.contains_members(params, 'pubKey', 'ttl', 'timestamp', 'data'):
- return respond(400, "invalid message format")
- if len(params['data']) > self.MaxMessageBody:
+ if len(data) > self.MaxMessageBody:
return respond(400, "message is too big")
-
- timestamp = int(params['timestamp'])
- ttl = int(params['ttl'])
+
+ timestamp = int(timestamp)
+ ttl = int(ttl)
if not self.timestamp_is_valid(timestamp, ttl):
return respond(403, "message timestamp invalid")
-
- pk = crypto.parse_pubkey(params['pubKey'])
- if pk is None:
- abort(400)
- snodes = snode.get_swarm_nodes_for_pk(pk)
+ snodes = snode.get_swarm_nodes_for_pk(pubKey)
our_pk = snode.get_our_pk()
if our_pk not in snodes:
# we are in the wrong snode
@@ -110,7 +77,7 @@ class StorageRPC(BaseRPC):
# obtain a message router to store, fanout and notify about a messagep
try:
with swarm_message_router() as router:
- router.got_message_from(pk, params['data'])
+ router.got_message_from(pubKey, data)
except Exception as ex:
return respond(500, "failed to handle message: {}".format(ex))
else: