(mOSAIC) Interoperability
Contents
Design
Responsibilities
- encapsulation -- yes;
- addressing -- yes;
- sequencing -- no;
- segmentation / reassembly -- no;
- connection control -- no;
- flow control -- no;
- error control -- no;
- multiplexing -- yes;
- authentication -- no;
- authorization -- no;
- encryption -- no;
- compression -- no;
Features
- state-full vs. state-less (i.e. the notion of session) (maybe hierarchical / nested sessions);
- pipelining vs. request-reply (i.e. multiple outstanding requests);
- (in the case of pipelining) synchronous vs. asynchronous (i.e. out of order replies);
- single request-reply semantic vs. state-machine;
- global vs. connection uniqueness (i.e. call identifier, agent addresses, etc.);
- transparent vs. opaque encoding;
- unidirectional vs. bidirectional (i.e. can the server also initiate requests);
- with / without "notifications" (i.e. requests without replies);
- with / without "headers" (like SOAP headers);
Implementations
Version 1
- Java implementation:
- Erlang implementation:
Version 2
_bucket = ... _channel = ... _endpoint = ... _connector = KvConnector (_bucket, _channel, _endpoint)
class KvConnector : def __init__ (self, _bucket, _channel, _endpoint) : self._active = False self._create_outcome = AsyncResult () self._session = KvSession (self, _bucket, _channel, _endpoint) self._create_outcome.get () self._create_outcome = None def _succeeded (self) : self._active = True self._connect_outcome.set (None) def _failed (self, _reason) : self._active = False if self._create_outcome is not None : self._create_outcome.set_exception (Exception (_reason)) def _terminated (self) : self._active = False def get (self, _key) : _outcome = AsyncResult () def _callback (_succeeded, _valueOrReason) : if _succeeded : _outcome.set (_valueOrReason) else : _outcome.set_exception (Exception (_valueOrReason)) self._session.get (_key, _callback) return _outcome.get () def list (self) : _outcome = AsyncResult () _queue = Queue () def _callback (_succeeded, _keysOrReason) : if _succeeded : if _keysOrReason is not None : for _key in _keysOrReason : _queue.put (_key) else : _outcome.set (None) _queue.put (StopIteration) else : _outcome.set_exception (Exception (_keysOrReason)) _queue.put (StopIteration) for _key in _queue : yield _key _outcome.get () return
class KvSession : def __init__ (self, _connector, _bucket, _channel, _endpoint) : self._state = 'new' self._connector = _connector self._channel = _channel self._session = None self._master_session_definition = self._channel.resolve_session_definition ("<<uuid>>") self._get_operation_session_definition = self._channel.resolve_session_definition ("<<uuid>>") self._list_operation_session_definition = self._channel.resolve_session_definition ("<<uuid>>") _message = KvAccessRequestMessage (_bucket) _channel_connector = self._channel.create_connector () _channel_connector.connect (_endpoint, self._master_session_definition, self, _message) _channel_connector.destroy () # part of the interoperability `SessionCallbacks` interface def created (self, _session, _message) : assert self._state == 'new' assert isinstance (_message, KvAccessRequestMessage) self._session = _session self._state = 'pending-access' # part of the interoperability `SessionCallbacks` interface def received (self, _session, _message) : if self._state == 'pending-access' : if isinstance (_message, KvAccessGrantedMessage) : self._state = 'connected' self._connector._succeeded () elif isinstance (_message, KvAccessDeniedMessage) : self._state = 'failed' self._connector._failed ('access-denied') else : assert False elif self._state == 'connected' : if isinstance (_message, KvTerminationMessage) : self._state = 'terminated' self._connector._terminated () else : assert False else : assert False # part of the interoperability `SessionCallbacks` interface def failed (self, _reason) : self._connector._failed ('driver-failure') self._state = 'failed' # delegate from `KvConnector` def get (self, _key, _callback) : assert self._state == 'connected' def _callback_wrapper (_succeeded, _message) : if not _succeeded : _callback (False, 'driver-failure') if isinstance (_message, KvGetValueResponseMessage) : _callback (True, _message.value) elif isinstance (_message, KvGetErrorResponseMessage) : _callback (False, _errors[_message.reason]) else : _callback (False, 'driver-failure') _callbacks = KvGenericOperationSession (_callback_wrapper) _message = KvGetRequestMessage (_key) self._session.sendAndSpawn (self._get_operation_session_definition, _callbacks, _message) # delegate from `KvConnector` def list (self, _callback) : assert self._state == 'connected' _callbacks = KvListOperationSession (_callback) _message = KvListRequestMessage () self._session.sendAndSpawn (self._list_operation_session_definition, _callbacks, _message) class KvListOperationSession : def __init__ (self, _callback) : self._state = 'new' self._callback = _callback def created (self, _session, _message) : assert self._state == 'new' self._state = 'pending' def received (self, _session, _message) : assert self._state == 'pending' if isinstance (_message, KvListKeysResponseMessage) : self._callback (True, _message.keys) elif isinstance (_message, KvListEndResponseMessage) : self._callback (True, None) elif isinstance (_message, KvListErrorResponseMessage) : self._callback (False, _errors[_message.reason]) self._state = 'completed' else : assert False def failed (self, _session, _reason) : assert self._state == 'pending' self._state = 'failed' self._callback (False, 'driver-failure') class KvGenericOperationSession : def __init__ (self, _callback) : self._state = 'new' self._callback = _callback def created (self, _session, _message) : assert self._state == 'new' self._state = 'pending' def received (self, _session, _message) : assert self._state = 'pending' self._state = 'completed' self._callback (True, _message) def failed (self, _session, _reason) : assert self._state = 'pending' self._state = 'failed' self._callback (False, _reason)
Proposals
Proposal A
Features:
- state-full and state-less (i.e. we can choose to have sessions);
- asynchronous pipelining;
- bidirectional;
- state-machine for both session and operation;
- with "notifications";
Message types:
- categorized by scope:
- state-full:
- session messages;
- operation messages;
- notification messages;
- state-less:
- operation messages;
- notification messages;
- state-full:
- categorized by semantic:
- session messages:
initiation request (initiator -> target);
initiation acknowledgment (target -> initiator);
termination request (any -> any);
termination acknowledgment (any -> any);
exchange (any -> any);
- session / stand-alone operation messages:
initiation (initiator -> target);
termination (target -> initiator);
exchange (any -> any);
- session / stand-alone notification messages:
initiation (initiator -> target);
termination (target -> initiator);
exchange (initiator -> target);
- session messages:
Message encoding:
message ::= { type :: enumeration, case type of session-initiation-req, session-initiation-ack -> initiator :: endpoint, target :: endpoint, session; session-termination-req, session-termination-ack -> session; session-exchange -> session; session-operation-initiation | session-notification-initiation -> session, operation, method, payload; session-operation-termination | session-notification-termination -> session, operation; session-operation-exchange | session-notification-exchange -> session, operation, method, payload; standalone-operation-initiation | standalone-notification-initiation -> initiator :: endpoint, target :: endpoint, operation, method, payload; standalone-operation-termination | standalone-notification-termination -> initiator :: endpoint, operation; standalone-operation-exchange | standalone-notification-termination -> initiator :: endpoint, operation, method, payload end } end-point ::= universally-unique-identifier session ::= universally-unique-identifier operation ::= scoped-unique-identifier(session|initiator) method ::= { identifier :: universally-unique-identifier, version :: scoped-unique-identifier(identifier) } payload ::= { encoding :: universally-unique-identifier, data :: length-prefixed-binary-string }
Examples
Specification example:
inspired from Getting Erlang to talk to the outside world;
# key-value put operation, stateless ...
Proposal B
Features:
- nested sessions;
- global symmetric (i.e. both parties can play the role of session initiator and target, if the state transitions allow it);
- session asymmetric (i.e. in the scope of a particular session one peer is the initiator and the other is the target);
- multi-layered state machines (i.e. states transitions in a session machine depends on states in the parent session machine);
- asynchronous pipelining;
- unifying the concept of operations and notifications (i.e. the concept of operations from proposal A, is transformed into a child session with exchanges inside;)
- normal messages can be piggy-backed on session control messages (i.e. a request-reply operation needs a session, but the request message both creates the session, and at the same time conveys the method and payload; the response the same, it conveys both the response and the session termination);
- partial ordering (i.e. multiple messages sent in nesting sessions are received in the same order, but messages in sibling sessions can have any ordering);
- supports tagging (i.e. SOAP-like headers);
Message types:
- session control:
initiation request (initiator -> target);
termination request (target -> initiator);
- callbacks;
Message encoding:
message ::= { type :: enumeration, case type of master-session-initiation-request -> initiator :: end-point, target :: end-point; nested-session-initiation-request -> nesting-session; {master,nested}-session-initiation-request -> protocol; {master,nested}-session-{initiation,termination}-{request,acknowledgment}, callback-message -> session; callback-message -> % void end, payload-count :: count, payload * } end-point, session, nesting-session, protocol, method ::= universally-unique-identifier payload ::= { semantic :: universally-unique-identifier, role :: universally-unique-identifier, mandatory :: boolean, encoding :: universally-unique-identifier, schema :: scoped-unique-identifier(method), data :: length-prefixed-binary-string }
Examples
Simple (incomplete) example
(session file-system (id 795e2d88-a5da-4194-a4fe-4d8bcee3ccf8) (roles (client (id f9b7ec52-9a02-4f78-93f5-abb242fb33dd)) (server (id 66b6b982-1e14-429b-80c9-0f8cf927910a))) (messages (access-request (type master-session-initiation-request) (payloads (access-request-payload (semantic a3a7c717-266d-4926-8a37-5b1281aab55e) (role server) (mandatory true) (data (encoding protocol-buffers) (schema a3a7c717-266d-4926-8a37-5b1281aab55e) % the same as the semantic (protocol-buffers-schema @{ ... protocol buffers specific specification }))))) (access-granted ...) (close-request ...) (close-acknowledge ...) ) (states (access (type initial) (roles client server)) (ready (type intermediary) (roles client server)) (closed (type final) (roles client server))) (transitions ((role client) (from void) (to access) (send access-request)) ((role server) (from void) (to access) (receive access-request)) ((role server) (from access-request) (to read) (send access-granted)) ((role server) (from access-request) (to closed) (send access-refused)) ((role client) (from access) (to ready) (receive access-granted)) ((role client) (from access) (to closed) (receive access-refused)) ) )
More complex example
#lang s-exp "protocol.rkt" (protocol (name connection) ; The protocol identifier should be an unique, opaque and constant UUID used to internally mark packets on the wire. ; It should be changed when making any backward incompatible changes. (This means almost always before a stable release.) (identifier "e7b45267-da3c-4954-b83e-c0630ae4bdef") ; Inspired from SOAP roles. ; Usually there are only two roles; but there are cases where we'll describe related protocol sessions (connections) ; involving multiple roles. (For example in message queues we have `publisher`, `queue/broker`, and `consumer`.) (roles ( (name client) (identifier "d6954b86-a74e-4e72-b026-366c13878125")) ( (name server) (identifier "7331397f-e4a4-4010-910c-2f2c05ccc4e4"))) (sessions ( (name connection) (identifier "f9a936a5-f09e-4b32-b713-1c54c31422d9") (type master) (roles client server) (messages ; A message is characterized by a session life-cycle related type and a payload (opaque from this point of view). ; The session life-cycle related types are: ; * `initiation` -- sent / received by a peer in order to establish a session; (either independent or nested;) ; * `exchange` -- sent / received by any peer during the life-time of the session; ; * `termination` -- sent / received by any peer to terminate the session both ways; ; (no further message is allowed in any direction after this;) ; The payload is application related and has no other meaning for the protocol specification (except encoding). ((name access-request) (type initiation)) ((name access-accept) (type exchange)) ((name access-reject) (type termination)) ((name select-request) (type initiation) (payload select-request)) ((name close) (type termination)) ((name abort) (type termination))) (exchanges ( (role client) (states ; Each session should have at least one transition from an `initial` state; at least one transition ; to a `terminal` state; the `intermediary` states could be missing. ; When a session is created it is automatically put in one of these `initial` states. (initial created) (intermediary access-pending ready) (terminal closed aborted)) (transitions ((name access-requesting) (when created) (next access-pending) (send access-request)) ((name access-accepted) (when access-pending) (next ready) (receive access-accept)) ((name access-rejected) (when access-pending) (next closed) (receive access-reject)) ((name disconnected) (when ready) (next closed) (send close)) ((name select-requesting) (when ready) (send select-request) ; The `select` is one example of an operation which is does not follow the `request/reply` pattern; ; thus we create a nested session of type `select-operation` placing us in the `client` role, ; with the initial state `pending`. (spawn select-operation (role client) (state pending)))) ((name self-aborted) (when access-pending ready) (next aborted) (send abort)) ((name peer-aborted) (when access-pending ready) (next aborted) (receive abort))) ( (role server) (states (initial created) (intermediary access-pending ready) (terminal closed aborted)) (transitions ((name access-requested) (when created) (next access-pending) (receive access-request)) ((name access-accepted) (when access-pending) (next accessing) (send access-accept)) ((name access-rejected) (when access-pending) (next closed) (send access-reject)) ((name disconnected) (when ready) (next closed) (receive close)) ((name select-requested) (when ready) (receive select-request) (spawn select-operation (role server) (state pending))) ((name self-aborted) (when access-pending ready) (next aborted) (send abort)) ((name peer-aborted) (when access-pending ready) (next aborted) (receive abort)))))) ( (name select-operation) (identifier "089247c8-60ca-46bd-9ddc-303907c317a1") (type nested) (parent connection) (roles client server) (messages ((name select-metadata) (type exchange) (payload select-metadata)) ((name select-stream-begin) (type exchange)) ((name select-stream-chunk) (type exchange) (payload select-stream-chunk)) ((name select-stream-end) (type termination)) ((name close) (type termination))) ((name abort) (type termination))) (exchanges ; This `select` operation has the following life-cycle: ; * initially the client sends the `select` (`get`) request, waiting for the meta-data of the associated value; ; * if the client only wanted to access the meta-data (like size, replicas, etc.) it finishes the operation here; ; * if however the client also wants to receive the data in requests the beginning of the data stream; ; * following, as the associated data could be several GB in size, the server dosen't send it all in one message, ; but sends multiple chunks to the client as separate messages; ; * when the server has no data to send it ends the stream with a special message; ; Observations: ; * we could have added some options inside the `select-stream-begin` message to specify the part of the ; entire data we are interested in; (like HTTP chunked requests;) ( (role client) (states (initial metadata-pending) (intermediary stream-pending stream-receiving) (terminal closed aborted)) (transitions ((name metadata-received) (when metadata-pending) (next stream-pending) (receive select-metadata)) ((name stream-beginning) (when stream-pending) (next stream-receiving) (send select-stream-begin)) ((name closed) (when stream-pending) (next closed) (send close)) ((name stream-received) (when stream-receiving) (receive select-stream-chunk)) ((name stream-ended) (when data-receiving) (next closed) (receive select-end)) ((name operation-aborting) (when metadata-pending stream-pending stream-receiving) (next aborted) (send abort)) ((name operation-aborted) (when metadata-pending stream-pending stream-receiving) (next aborted) (receive abort)))) ( (role server) (states (initial metadata-pending) (intermediary stream-pending stream-sending) (terminal closed aborted)) (transitions ((name metadata-sent) (when metadata-pending) (next stream-pending) (send select-metadata)) ((name stream-beginning) (when stream-pending) (next stream-sending) (receive select-stream-begin)) ((name closed) (when stream-pending) (next closed) (receive close)) ((name stream-sent) (when stream-sending) (send select-stream-chunk)) ((name stream-ended) (when stream-sending) (next closed) (send select-stream-end)) ((name operation-aborting) (when metadata-pending stream-pending stream-receiving) (next aborted) (send abort)) ((name operation-aborted) (when metadata-pending stream-pending stream-receiving) (next aborted) (receive abort)))))) (payloads ( (name select-request) (identifier "0a320756-b799-47dd-9455-685ca610ae62") (encoding protocol-buffers) (specification (message "SelectRequest") (proto-file "key-value-store-select.proto"))) ( (name select-metadata) (identifier "d8b9be74-27b5-48c8-910c-6a615b88c7a3") (encoding protocol-buffers) (specification (message "SelectMetaData") (proto-file "key-value-store-select.proto"))) ( (name select-stream-chunk) (identifier "5e8b3965-b82c-40cb-b832-5ea8f8e3ea97") (encoding protocol-buffers) (specification (message "SelectStreamChunk") (proto-file "key-value-store-select.proto")))) )
Technologies
MessagePack
request ::= [type, msgid, method, params] reply ::= [type, msgid, error, result] notification ::= [type, method, params] type ::= 0 # request ::= 1 # response ::= 2 # notification msgid ::= 32-unsigned method ::= string params ::= array error, result ::= object | nil
BERT
-> {call, Module, Function, Arguments} <- {reply, Result} <- {error, {Type, Code, Class, Detail, Backtrace}} -> {cast, Module, Function, Arguments} <- {noreply} <> {info, Command, Options}
AMF
amf-packet ::= version header-count *(header-type) message-count *(message-type) header-type ::= header-name must-understand header-length value-type message-type ::= target-uri response-uri message-length value-type header-count, message-count, version ::= U16 must-understand ::= U8 header-length, message-length ::= U32 # -1 if unknown header-name, target-uri, response-uri ::= UTF-8
Hessian
top ::= call ::= replycall ::= c x01 x00 header* methodobject* z reply ::= r x01 x00 header* object z ::= r x01 x00 header* fault z fault ::= f (objectobject)* z
Sun RPC
struct rpc_msg { unsigned int xid; union switch (msg_type mtype) { case CALL: call_body cbody; case REPLY: reply_body rbody; } body; }; struct call_body { unsigned int rpcvers; /* must be equal to two (2) */ unsigned int prog; unsigned int vers; unsigned int proc; opaque_auth cred; opaque_auth verf; /* procedure-specific parameters start here */ }; union reply_body switch (reply_stat stat) { case MSG_ACCEPTED: accepted_reply areply; case MSG_DENIED: rejected_reply rreply; }; struct accepted_reply { opaque_auth verf; union switch (accept_stat stat) { case SUCCESS: opaque results[0]; /* * procedure-specific results start here */ case PROG_MISMATCH: struct { unsigned int low; unsigned int high; } mismatch_info; default: /* * Void. Cases include PROG_UNAVAIL, PROC_UNAVAIL, * GARBAGE_ARGS, and SYSTEM_ERR. */ void; } reply_data; }; union rejected_reply switch (reject_stat stat) { case RPC_MISMATCH: struct { unsigned int low; unsigned int high; } mismatch_info; case AUTH_ERROR: auth_stat stat; };
Thrift
JSON-RPC
Request ::= { jsonrpc : "2.0", id : Number | String, method : String, [params : Structured,] } Response ::= { jsonrpc : "2.0", id : Number | String, [result : Any | error : Error,] } Error ::= { code : Number, message : String, [data : Any,] } Notification ::= { jsonrpc : "2.0", method : String, [params : Structured,] } BatchRequest ::= [Request | Notification, ...] BatchResponse ::= [Response, ...]
XML-RPC
<methodCall> <methodName>String</methodName> <params (?)> <param (*)><value>(value)</value></param> </params> </methodCall> <methodResponse> <params> <param><value>(value)</value></param> </params> </methodResponse> <methodResponse> <fault> <value>(...)</value> </fault> </methodResponse>
SOAP
<Envelope> <Header (?)> <(qname) role=(uri)(?) mustUnderstand=(boolean)(?) relay=(boolean)(?) (*)>...</(qname)> </Header> </Envelope> <Fault> <(code::=) Code> <Value>(qname)</Value> <Subcode (?)>(code)</Subcode> </Code> <Reason><Text (+)>(text)</Text></Reason> <Node (?)>(uri)</Node> <Role (?)>(uri)</Role> <Detail (?)>(any)</Detail> </Fault>