(mOSAIC) Interoperability
Contents
Design
Responsibilities
- encapsulation -- yes;
- addressing -- yes;
- sequencing -- no;
- segmentation / reassembly -- no;
- connection control -- no;
- flow control -- no;
- error control -- no;
- multiplexing -- yes;
- authentication -- no;
- authorization -- no;
- encryption -- no;
- compression -- no;
Features
- state-full vs. state-less (i.e. the notion of session) (maybe hierarchical / nested sessions);
- pipelining vs. request-reply (i.e. multiple outstanding requests);
- (in the case of pipelining) synchronous vs. asynchronous (i.e. out of order replies);
- single request-reply semantic vs. state-machine;
- global vs. connection uniqueness (i.e. call identifier, agent addresses, etc.);
- transparent vs. opaque encoding;
- unidirectional vs. bidirectional (i.e. can the server also initiate requests);
- with / without "notifications" (i.e. requests without replies);
- with / without "headers" (like SOAP headers);
Implementations
Version 1
- Java implementation:
- Erlang implementation:
Version 2
_bucket = ... _channel = ... _endpoint = ... _connector = KvConnector (_bucket, _channel, _endpoint)
class KvConnector :
def __init__ (self, _bucket, _channel, _endpoint) :
self._active = False
self._create_outcome = AsyncResult ()
self._session = KvSession (self, _bucket, _channel, _endpoint)
self._create_outcome.get ()
self._create_outcome = None
def _succeeded (self) :
self._active = True
self._connect_outcome.set (None)
def _failed (self, _reason) :
self._active = False
if self._create_outcome is not None :
self._create_outcome.set_exception (Exception (_reason))
def _terminated (self) :
self._active = False
def get (self, _key) :
_outcome = AsyncResult ()
def _callback (_succeeded, _valueOrReason) :
if _succeeded :
_outcome.set (_valueOrReason)
else :
_outcome.set_exception (Exception (_valueOrReason))
self._session.get (_key, _callback)
return _outcome.get ()
def list (self) :
_outcome = AsyncResult ()
_queue = Queue ()
def _callback (_succeeded, _keysOrReason) :
if _succeeded :
if _keysOrReason is not None :
for _key in _keysOrReason :
_queue.put (_key)
else :
_outcome.set (None)
_queue.put (StopIteration)
else :
_outcome.set_exception (Exception (_keysOrReason))
_queue.put (StopIteration)
for _key in _queue :
yield _key
_outcome.get ()
returnclass KvSession :
def __init__ (self, _connector, _bucket, _channel, _endpoint) :
self._state = 'new'
self._connector = _connector
self._channel = _channel
self._session = None
self._master_session_definition = self._channel.resolve_session_definition ("<<uuid>>")
self._get_operation_session_definition = self._channel.resolve_session_definition ("<<uuid>>")
self._list_operation_session_definition = self._channel.resolve_session_definition ("<<uuid>>")
_message = KvAccessRequestMessage (_bucket)
_channel_connector = self._channel.create_connector ()
_channel_connector.connect (_endpoint, self._master_session_definition, self, _message)
_channel_connector.destroy ()
# part of the interoperability `SessionCallbacks` interface
def created (self, _session, _message) :
assert self._state == 'new'
assert isinstance (_message, KvAccessRequestMessage)
self._session = _session
self._state = 'pending-access'
# part of the interoperability `SessionCallbacks` interface
def received (self, _session, _message) :
if self._state == 'pending-access' :
if isinstance (_message, KvAccessGrantedMessage) :
self._state = 'connected'
self._connector._succeeded ()
elif isinstance (_message, KvAccessDeniedMessage) :
self._state = 'failed'
self._connector._failed ('access-denied')
else :
assert False
elif self._state == 'connected' :
if isinstance (_message, KvTerminationMessage) :
self._state = 'terminated'
self._connector._terminated ()
else :
assert False
else :
assert False
# part of the interoperability `SessionCallbacks` interface
def failed (self, _reason) :
self._connector._failed ('driver-failure')
self._state = 'failed'
# delegate from `KvConnector`
def get (self, _key, _callback) :
assert self._state == 'connected'
def _callback_wrapper (_succeeded, _message) :
if not _succeeded :
_callback (False, 'driver-failure')
if isinstance (_message, KvGetValueResponseMessage) :
_callback (True, _message.value)
elif isinstance (_message, KvGetErrorResponseMessage) :
_callback (False, _errors[_message.reason])
else :
_callback (False, 'driver-failure')
_callbacks = KvGenericOperationSession (_callback_wrapper)
_message = KvGetRequestMessage (_key)
self._session.sendAndSpawn (self._get_operation_session_definition, _callbacks, _message)
# delegate from `KvConnector`
def list (self, _callback) :
assert self._state == 'connected'
_callbacks = KvListOperationSession (_callback)
_message = KvListRequestMessage ()
self._session.sendAndSpawn (self._list_operation_session_definition, _callbacks, _message)
class KvListOperationSession :
def __init__ (self, _callback) :
self._state = 'new'
self._callback = _callback
def created (self, _session, _message) :
assert self._state == 'new'
self._state = 'pending'
def received (self, _session, _message) :
assert self._state == 'pending'
if isinstance (_message, KvListKeysResponseMessage) :
self._callback (True, _message.keys)
elif isinstance (_message, KvListEndResponseMessage) :
self._callback (True, None)
elif isinstance (_message, KvListErrorResponseMessage) :
self._callback (False, _errors[_message.reason])
self._state = 'completed'
else :
assert False
def failed (self, _session, _reason) :
assert self._state == 'pending'
self._state = 'failed'
self._callback (False, 'driver-failure')
class KvGenericOperationSession :
def __init__ (self, _callback) :
self._state = 'new'
self._callback = _callback
def created (self, _session, _message) :
assert self._state == 'new'
self._state = 'pending'
def received (self, _session, _message) :
assert self._state = 'pending'
self._state = 'completed'
self._callback (True, _message)
def failed (self, _session, _reason) :
assert self._state = 'pending'
self._state = 'failed'
self._callback (False, _reason)Proposals
Proposal A
Features:
- state-full and state-less (i.e. we can choose to have sessions);
- asynchronous pipelining;
- bidirectional;
- state-machine for both session and operation;
- with "notifications";
Message types:
- categorized by scope:
- state-full:
- session messages;
- operation messages;
- notification messages;
- state-less:
- operation messages;
- notification messages;
- state-full:
- categorized by semantic:
- session messages:
initiation request (initiator -> target);
initiation acknowledgment (target -> initiator);
termination request (any -> any);
termination acknowledgment (any -> any);
exchange (any -> any);
- session / stand-alone operation messages:
initiation (initiator -> target);
termination (target -> initiator);
exchange (any -> any);
- session / stand-alone notification messages:
initiation (initiator -> target);
termination (target -> initiator);
exchange (initiator -> target);
- session messages:
Message encoding:
message ::= {
type :: enumeration,
case type of
session-initiation-req, session-initiation-ack ->
initiator :: endpoint,
target :: endpoint,
session;
session-termination-req, session-termination-ack ->
session;
session-exchange ->
session;
session-operation-initiation | session-notification-initiation ->
session,
operation,
method,
payload;
session-operation-termination | session-notification-termination ->
session,
operation;
session-operation-exchange | session-notification-exchange ->
session,
operation,
method,
payload;
standalone-operation-initiation | standalone-notification-initiation ->
initiator :: endpoint,
target :: endpoint,
operation,
method,
payload;
standalone-operation-termination | standalone-notification-termination ->
initiator :: endpoint,
operation;
standalone-operation-exchange | standalone-notification-termination ->
initiator :: endpoint,
operation,
method,
payload
end
}
end-point ::= universally-unique-identifier
session ::= universally-unique-identifier
operation ::= scoped-unique-identifier(session|initiator)
method ::= {
identifier :: universally-unique-identifier,
version :: scoped-unique-identifier(identifier)
}
payload ::= {
encoding :: universally-unique-identifier,
data :: length-prefixed-binary-string
}Examples
Specification example:
inspired from Getting Erlang to talk to the outside world;
# key-value put operation, stateless ...
Proposal B
Features:
- nested sessions;
- global symmetric (i.e. both parties can play the role of session initiator and target, if the state transitions allow it);
- session asymmetric (i.e. in the scope of a particular session one peer is the initiator and the other is the target);
- multi-layered state machines (i.e. states transitions in a session machine depends on states in the parent session machine);
- asynchronous pipelining;
- unifying the concept of operations and notifications (i.e. the concept of operations from proposal A, is transformed into a child session with exchanges inside;)
- normal messages can be piggy-backed on session control messages (i.e. a request-reply operation needs a session, but the request message both creates the session, and at the same time conveys the method and payload; the response the same, it conveys both the response and the session termination);
- partial ordering (i.e. multiple messages sent in nesting sessions are received in the same order, but messages in sibling sessions can have any ordering);
- supports tagging (i.e. SOAP-like headers);
Message types:
- session control:
initiation request (initiator -> target);
termination request (target -> initiator);
- callbacks;
Message encoding:
message ::= {
type :: enumeration,
case type of
master-session-initiation-request ->
initiator :: end-point,
target :: end-point;
nested-session-initiation-request ->
nesting-session;
{master,nested}-session-initiation-request ->
protocol;
{master,nested}-session-{initiation,termination}-{request,acknowledgment}, callback-message ->
session;
callback-message ->
% void
end,
payload-count :: count,
payload *
}
end-point, session, nesting-session, protocol, method ::= universally-unique-identifier
payload ::= {
semantic :: universally-unique-identifier,
role :: universally-unique-identifier,
mandatory :: boolean,
encoding :: universally-unique-identifier,
schema :: scoped-unique-identifier(method),
data :: length-prefixed-binary-string
}Examples
Simple (incomplete) example
(session file-system (id 795e2d88-a5da-4194-a4fe-4d8bcee3ccf8)
(roles
(client (id f9b7ec52-9a02-4f78-93f5-abb242fb33dd))
(server (id 66b6b982-1e14-429b-80c9-0f8cf927910a)))
(messages
(access-request
(type master-session-initiation-request)
(payloads
(access-request-payload
(semantic a3a7c717-266d-4926-8a37-5b1281aab55e)
(role server)
(mandatory true)
(data
(encoding protocol-buffers)
(schema a3a7c717-266d-4926-8a37-5b1281aab55e) % the same as the semantic
(protocol-buffers-schema @{
... protocol buffers specific specification
})))))
(access-granted ...)
(close-request ...)
(close-acknowledge ...)
)
(states
(access (type initial) (roles client server))
(ready (type intermediary) (roles client server))
(closed (type final) (roles client server)))
(transitions
((role client) (from void) (to access) (send access-request))
((role server) (from void) (to access) (receive access-request))
((role server) (from access-request) (to read) (send access-granted))
((role server) (from access-request) (to closed) (send access-refused))
((role client) (from access) (to ready) (receive access-granted))
((role client) (from access) (to closed) (receive access-refused))
)
)More complex example
#lang s-exp "protocol.rkt"
(protocol
(name connection)
; The protocol identifier should be an unique, opaque and constant UUID used to internally mark packets on the wire.
; It should be changed when making any backward incompatible changes. (This means almost always before a stable release.)
(identifier "e7b45267-da3c-4954-b83e-c0630ae4bdef")
; Inspired from SOAP roles.
; Usually there are only two roles; but there are cases where we'll describe related protocol sessions (connections)
; involving multiple roles. (For example in message queues we have `publisher`, `queue/broker`, and `consumer`.)
(roles
(
(name client)
(identifier "d6954b86-a74e-4e72-b026-366c13878125"))
(
(name server)
(identifier "7331397f-e4a4-4010-910c-2f2c05ccc4e4")))
(sessions
(
(name connection)
(identifier "f9a936a5-f09e-4b32-b713-1c54c31422d9")
(type master)
(roles client server)
(messages
; A message is characterized by a session life-cycle related type and a payload (opaque from this point of view).
; The session life-cycle related types are:
; * `initiation` -- sent / received by a peer in order to establish a session; (either independent or nested;)
; * `exchange` -- sent / received by any peer during the life-time of the session;
; * `termination` -- sent / received by any peer to terminate the session both ways;
; (no further message is allowed in any direction after this;)
; The payload is application related and has no other meaning for the protocol specification (except encoding).
((name access-request) (type initiation))
((name access-accept) (type exchange))
((name access-reject) (type termination))
((name select-request) (type initiation) (payload select-request))
((name close) (type termination))
((name abort) (type termination)))
(exchanges
(
(role client)
(states
; Each session should have at least one transition from an `initial` state; at least one transition
; to a `terminal` state; the `intermediary` states could be missing.
; When a session is created it is automatically put in one of these `initial` states.
(initial created)
(intermediary access-pending ready)
(terminal closed aborted))
(transitions
((name access-requesting) (when created) (next access-pending)
(send access-request))
((name access-accepted) (when access-pending) (next ready)
(receive access-accept))
((name access-rejected) (when access-pending) (next closed)
(receive access-reject))
((name disconnected) (when ready) (next closed)
(send close))
((name select-requesting) (when ready)
(send select-request)
; The `select` is one example of an operation which is does not follow the `request/reply` pattern;
; thus we create a nested session of type `select-operation` placing us in the `client` role,
; with the initial state `pending`.
(spawn select-operation (role client) (state pending))))
((name self-aborted) (when access-pending ready) (next aborted)
(send abort))
((name peer-aborted) (when access-pending ready) (next aborted)
(receive abort)))
(
(role server)
(states
(initial created)
(intermediary access-pending ready)
(terminal closed aborted))
(transitions
((name access-requested) (when created) (next access-pending)
(receive access-request))
((name access-accepted) (when access-pending) (next accessing)
(send access-accept))
((name access-rejected) (when access-pending) (next closed)
(send access-reject))
((name disconnected) (when ready) (next closed)
(receive close))
((name select-requested) (when ready)
(receive select-request)
(spawn select-operation (role server) (state pending)))
((name self-aborted) (when access-pending ready) (next aborted)
(send abort))
((name peer-aborted) (when access-pending ready) (next aborted)
(receive abort))))))
(
(name select-operation)
(identifier "089247c8-60ca-46bd-9ddc-303907c317a1")
(type nested)
(parent connection)
(roles client server)
(messages
((name select-metadata) (type exchange) (payload select-metadata))
((name select-stream-begin) (type exchange))
((name select-stream-chunk) (type exchange) (payload select-stream-chunk))
((name select-stream-end) (type termination))
((name close) (type termination)))
((name abort) (type termination)))
(exchanges
; This `select` operation has the following life-cycle:
; * initially the client sends the `select` (`get`) request, waiting for the meta-data of the associated value;
; * if the client only wanted to access the meta-data (like size, replicas, etc.) it finishes the operation here;
; * if however the client also wants to receive the data in requests the beginning of the data stream;
; * following, as the associated data could be several GB in size, the server dosen't send it all in one message,
; but sends multiple chunks to the client as separate messages;
; * when the server has no data to send it ends the stream with a special message;
; Observations:
; * we could have added some options inside the `select-stream-begin` message to specify the part of the
; entire data we are interested in; (like HTTP chunked requests;)
(
(role client)
(states
(initial metadata-pending)
(intermediary stream-pending stream-receiving)
(terminal closed aborted))
(transitions
((name metadata-received) (when metadata-pending) (next stream-pending)
(receive select-metadata))
((name stream-beginning) (when stream-pending) (next stream-receiving)
(send select-stream-begin))
((name closed) (when stream-pending) (next closed)
(send close))
((name stream-received) (when stream-receiving)
(receive select-stream-chunk))
((name stream-ended) (when data-receiving) (next closed)
(receive select-end))
((name operation-aborting) (when metadata-pending stream-pending stream-receiving) (next aborted)
(send abort))
((name operation-aborted) (when metadata-pending stream-pending stream-receiving) (next aborted)
(receive abort))))
(
(role server)
(states
(initial metadata-pending)
(intermediary stream-pending stream-sending)
(terminal closed aborted))
(transitions
((name metadata-sent) (when metadata-pending) (next stream-pending)
(send select-metadata))
((name stream-beginning) (when stream-pending) (next stream-sending)
(receive select-stream-begin))
((name closed) (when stream-pending) (next closed)
(receive close))
((name stream-sent) (when stream-sending)
(send select-stream-chunk))
((name stream-ended) (when stream-sending) (next closed)
(send select-stream-end))
((name operation-aborting) (when metadata-pending stream-pending stream-receiving) (next aborted)
(send abort))
((name operation-aborted) (when metadata-pending stream-pending stream-receiving) (next aborted)
(receive abort))))))
(payloads
(
(name select-request)
(identifier "0a320756-b799-47dd-9455-685ca610ae62")
(encoding protocol-buffers)
(specification
(message "SelectRequest")
(proto-file "key-value-store-select.proto")))
(
(name select-metadata)
(identifier "d8b9be74-27b5-48c8-910c-6a615b88c7a3")
(encoding protocol-buffers)
(specification
(message "SelectMetaData")
(proto-file "key-value-store-select.proto")))
(
(name select-stream-chunk)
(identifier "5e8b3965-b82c-40cb-b832-5ea8f8e3ea97")
(encoding protocol-buffers)
(specification
(message "SelectStreamChunk")
(proto-file "key-value-store-select.proto"))))
)Technologies
MessagePack
request ::= [type, msgid, method, params]
reply ::= [type, msgid, error, result]
notification ::= [type, method, params]
type ::= 0 # request
::= 1 # response
::= 2 # notification
msgid ::= 32-unsigned
method ::= string
params ::= array
error, result ::= object | nilBERT
-> {call, Module, Function, Arguments}
<- {reply, Result}
<- {error, {Type, Code, Class, Detail, Backtrace}}
-> {cast, Module, Function, Arguments}
<- {noreply}
<> {info, Command, Options}AMF
amf-packet ::= version header-count *(header-type) message-count *(message-type) header-type ::= header-name must-understand header-length value-type message-type ::= target-uri response-uri message-length value-type header-count, message-count, version ::= U16 must-understand ::= U8 header-length, message-length ::= U32 # -1 if unknown header-name, target-uri, response-uri ::= UTF-8
Hessian
top ::= call
::= replycall
::= c x01 x00 header* methodobject* z
reply ::= r x01 x00 header* object z
::= r x01 x00 header* fault z
fault ::= f (objectobject)* zSun RPC
struct rpc_msg {
unsigned int xid;
union switch (msg_type mtype) {
case CALL:
call_body cbody;
case REPLY:
reply_body rbody;
} body;
};
struct call_body {
unsigned int rpcvers; /* must be equal to two (2) */
unsigned int prog;
unsigned int vers;
unsigned int proc;
opaque_auth cred;
opaque_auth verf;
/* procedure-specific parameters start here */
};
union reply_body switch (reply_stat stat) {
case MSG_ACCEPTED:
accepted_reply areply;
case MSG_DENIED:
rejected_reply rreply;
};
struct accepted_reply {
opaque_auth verf;
union switch (accept_stat stat) {
case SUCCESS:
opaque results[0];
/*
* procedure-specific results start here
*/
case PROG_MISMATCH:
struct {
unsigned int low;
unsigned int high;
} mismatch_info;
default:
/*
* Void. Cases include PROG_UNAVAIL, PROC_UNAVAIL,
* GARBAGE_ARGS, and SYSTEM_ERR.
*/
void;
} reply_data;
};
union rejected_reply switch (reject_stat stat) {
case RPC_MISMATCH:
struct {
unsigned int low;
unsigned int high;
} mismatch_info;
case AUTH_ERROR:
auth_stat stat;
};Thrift
JSON-RPC
Request ::= {
jsonrpc : "2.0",
id : Number | String,
method : String,
[params : Structured,]
}
Response ::= {
jsonrpc : "2.0",
id : Number | String,
[result : Any | error : Error,]
}
Error ::= {
code : Number,
message : String,
[data : Any,]
}
Notification ::= {
jsonrpc : "2.0",
method : String,
[params : Structured,]
}
BatchRequest ::= [Request | Notification, ...]
BatchResponse ::= [Response, ...]XML-RPC
<methodCall>
<methodName>String</methodName>
<params (?)>
<param (*)><value>(value)</value></param>
</params>
</methodCall>
<methodResponse>
<params>
<param><value>(value)</value></param>
</params>
</methodResponse>
<methodResponse>
<fault>
<value>(...)</value>
</fault>
</methodResponse>SOAP
<Envelope>
<Header (?)>
<(qname) role=(uri)(?) mustUnderstand=(boolean)(?) relay=(boolean)(?) (*)>...</(qname)>
</Header>
</Envelope>
<Fault>
<(code::=) Code>
<Value>(qname)</Value>
<Subcode (?)>(code)</Subcode>
</Code>
<Reason><Text (+)>(text)</Text></Reason>
<Node (?)>(uri)</Node>
<Role (?)>(uri)</Role>
<Detail (?)>(any)</Detail>
</Fault>