(mOSAIC) Interoperability

Design

Responsibilities

Features

Implementations

Version 1

Version 2

_bucket = ...
_channel = ...
_endpoint = ...
_connector = KvConnector (_bucket, _channel, _endpoint)

class KvConnector :
        
        def __init__ (self, _bucket, _channel, _endpoint) :
                self._active = False
                self._create_outcome = AsyncResult ()
                self._session = KvSession (self, _bucket, _channel, _endpoint)
                self._create_outcome.get ()
                self._create_outcome = None
        
        def _succeeded (self) :
                self._active = True
                self._connect_outcome.set (None)
        
        def _failed (self, _reason) :
                self._active = False
                if self._create_outcome is not None :
                        self._create_outcome.set_exception (Exception (_reason))
        
        def _terminated (self) :
                self._active = False
        
        def get (self, _key) :
                _outcome = AsyncResult ()
                def _callback (_succeeded, _valueOrReason) :
                        if _succeeded :
                                _outcome.set (_valueOrReason)
                        else :
                                _outcome.set_exception (Exception (_valueOrReason))
                self._session.get (_key, _callback)
                return _outcome.get ()
        
        def list (self) :
                _outcome = AsyncResult ()
                _queue = Queue ()
                def _callback (_succeeded, _keysOrReason) :
                        if _succeeded :
                                if _keysOrReason is not None :
                                        for _key in _keysOrReason :
                                                _queue.put (_key)
                                else :
                                        _outcome.set (None)
                                        _queue.put (StopIteration)
                        else :
                                _outcome.set_exception (Exception (_keysOrReason))
                                _queue.put (StopIteration)
                for _key in _queue :
                        yield _key
                _outcome.get ()
                return

class KvSession :
        
        def __init__ (self, _connector, _bucket, _channel, _endpoint) :
                self._state = 'new'
                self._connector = _connector
                self._channel = _channel
                self._session = None
                self._master_session_definition = self._channel.resolve_session_definition ("<<uuid>>")
                self._get_operation_session_definition = self._channel.resolve_session_definition ("<<uuid>>")
                self._list_operation_session_definition = self._channel.resolve_session_definition ("<<uuid>>")
                _message = KvAccessRequestMessage (_bucket)
                _channel_connector = self._channel.create_connector ()
                _channel_connector.connect (_endpoint, self._master_session_definition, self, _message)
                _channel_connector.destroy ()
        
        # part of the interoperability `SessionCallbacks` interface
        def created (self, _session, _message) :
                assert self._state == 'new'
                assert isinstance (_message, KvAccessRequestMessage)
                self._session = _session
                self._state = 'pending-access'
        
        # part of the interoperability `SessionCallbacks` interface
        def received (self, _session, _message) :
                if self._state == 'pending-access' :
                        if isinstance (_message, KvAccessGrantedMessage) :
                                self._state = 'connected'
                                self._connector._succeeded ()
                        elif isinstance (_message, KvAccessDeniedMessage) :
                                self._state = 'failed'
                                self._connector._failed ('access-denied')
                        else :
                                assert False
                elif self._state == 'connected' :
                        if isinstance (_message, KvTerminationMessage) :
                                self._state = 'terminated'
                                self._connector._terminated ()
                        else :
                                assert False
                else :
                        assert False
        
        # part of the interoperability `SessionCallbacks` interface
        def failed (self, _reason) :
                self._connector._failed ('driver-failure')
                self._state = 'failed'
        
        # delegate from `KvConnector`
        def get (self, _key, _callback) :
                assert self._state == 'connected'
                def _callback_wrapper (_succeeded, _message) :
                        if not _succeeded :
                                _callback (False, 'driver-failure')
                        if isinstance (_message, KvGetValueResponseMessage) :
                                _callback (True, _message.value)
                        elif isinstance (_message, KvGetErrorResponseMessage) :
                                _callback (False, _errors[_message.reason])
                        else :
                                _callback (False, 'driver-failure')
                _callbacks = KvGenericOperationSession (_callback_wrapper)
                _message = KvGetRequestMessage (_key)
                self._session.sendAndSpawn (self._get_operation_session_definition, _callbacks, _message)
        
        # delegate from `KvConnector`
        def list (self, _callback) :
                assert self._state == 'connected'
                _callbacks = KvListOperationSession (_callback)
                _message = KvListRequestMessage ()
                self._session.sendAndSpawn (self._list_operation_session_definition, _callbacks, _message)

class KvListOperationSession :
        
        def __init__ (self, _callback) :
                self._state = 'new'
                self._callback = _callback
        
        def created (self, _session, _message) :
                assert self._state == 'new'
                self._state = 'pending'
        
        def received (self, _session, _message) :
                assert self._state == 'pending'
                if isinstance (_message, KvListKeysResponseMessage) :
                        self._callback (True, _message.keys)
                elif isinstance (_message, KvListEndResponseMessage) :
                        self._callback (True, None)
                elif isinstance (_message, KvListErrorResponseMessage) :
                        self._callback (False, _errors[_message.reason])
                        self._state = 'completed'
                else :
                        assert False
        
        def failed (self, _session, _reason) :
                assert self._state == 'pending'
                self._state = 'failed'
                self._callback (False, 'driver-failure')

class KvGenericOperationSession :
        
        def __init__ (self, _callback) :
                self._state = 'new'
                self._callback = _callback
        
        def created (self, _session, _message) :
                assert self._state == 'new'
                self._state = 'pending'
        
        def received (self, _session, _message) :
                assert self._state = 'pending'
                self._state = 'completed'
                self._callback (True, _message)
        
        def failed (self, _session, _reason) :
                assert self._state = 'pending'
                self._state = 'failed'
                self._callback (False, _reason)

Proposals

Proposal A

Features:

Message types:

Message encoding:

message ::= {
    type :: enumeration,
    case type of
        session-initiation-req, session-initiation-ack ->
            initiator :: endpoint,
            target :: endpoint,
            session;
        session-termination-req, session-termination-ack ->
            session;
        session-exchange ->
            session;
        session-operation-initiation | session-notification-initiation ->
            session,
            operation,
            method,
            payload;
        session-operation-termination | session-notification-termination ->
            session,
            operation;
        session-operation-exchange | session-notification-exchange ->
            session,
            operation,
            method,
            payload;
        standalone-operation-initiation | standalone-notification-initiation ->
            initiator :: endpoint,
            target :: endpoint,
            operation,
            method,
            payload;
        standalone-operation-termination | standalone-notification-termination ->
            initiator :: endpoint,
            operation;
        standalone-operation-exchange | standalone-notification-termination ->
            initiator :: endpoint,
            operation,
            method,
            payload
    end
}

end-point ::= universally-unique-identifier
session ::= universally-unique-identifier
operation ::= scoped-unique-identifier(session|initiator)

method ::= {
    identifier :: universally-unique-identifier,
    version :: scoped-unique-identifier(identifier)
}

payload ::= {
    encoding :: universally-unique-identifier,
    data :: length-prefixed-binary-string
}

Examples

Specification example:

# key-value put operation, stateless
...

Proposal B

Features:

Message types:

Message encoding:

message ::= {
    type :: enumeration,
    case type of
        master-session-initiation-request ->
            initiator :: end-point,
            target :: end-point;
        nested-session-initiation-request ->
            nesting-session;
        {master,nested}-session-initiation-request ->
            protocol;
        {master,nested}-session-{initiation,termination}-{request,acknowledgment}, callback-message ->
            session;
        callback-message ->
            % void
    end,
    payload-count :: count,
    payload *
}

end-point, session, nesting-session, protocol, method ::= universally-unique-identifier

payload ::= {
    semantic :: universally-unique-identifier,
    role :: universally-unique-identifier,
    mandatory :: boolean,
    encoding :: universally-unique-identifier,
    schema :: scoped-unique-identifier(method),
    data :: length-prefixed-binary-string
}

Examples

Simple (incomplete) example

(session file-system (id 795e2d88-a5da-4194-a4fe-4d8bcee3ccf8)
    
    (roles
        (client (id f9b7ec52-9a02-4f78-93f5-abb242fb33dd))
        (server (id 66b6b982-1e14-429b-80c9-0f8cf927910a)))
    
    (messages
        
        (access-request
            (type master-session-initiation-request)
            (payloads
                (access-request-payload
                    (semantic a3a7c717-266d-4926-8a37-5b1281aab55e)
                    (role server)
                    (mandatory true)
                    (data
                        (encoding protocol-buffers)
                        (schema a3a7c717-266d-4926-8a37-5b1281aab55e) % the same as the semantic
                        (protocol-buffers-schema @{
                            ... protocol buffers specific specification
                        })))))
        
        (access-granted ...)
        (close-request ...)
        (close-acknowledge ...)
    )
    
    (states
        (access (type initial) (roles client server))
        (ready (type intermediary) (roles client server))
        (closed (type final) (roles client server)))
    
    (transitions
        ((role client) (from void) (to access) (send access-request))
        ((role server) (from void) (to access) (receive access-request))
        ((role server) (from access-request) (to read) (send access-granted))
        ((role server) (from access-request) (to closed) (send access-refused))
        ((role client) (from access) (to ready) (receive access-granted))
        ((role client) (from access) (to closed) (receive access-refused))
    )
)

More complex example

#lang s-exp "protocol.rkt"

(protocol
        (name connection)
                ; The protocol identifier should be an unique, opaque and constant UUID used to internally mark packets on the wire.
                ; It should be changed when making any backward incompatible changes. (This means almost always before a stable release.)
        (identifier "e7b45267-da3c-4954-b83e-c0630ae4bdef")
                ; Inspired from SOAP roles.
                ; Usually there are only two roles; but there are cases where we'll describe related protocol sessions (connections)
                ; involving multiple roles. (For example in message queues we have `publisher`, `queue/broker`, and `consumer`.)
        (roles
                (
                        (name client)
                        (identifier "d6954b86-a74e-4e72-b026-366c13878125"))
                (
                        (name server)
                        (identifier "7331397f-e4a4-4010-910c-2f2c05ccc4e4")))
        (sessions
                (
                        (name connection)
                        (identifier "f9a936a5-f09e-4b32-b713-1c54c31422d9")
                        (type master)
                        (roles client server)
                        (messages
                                        ; A message is characterized by a session life-cycle related type and a payload (opaque from this point of view).
                                        ; The session life-cycle related types are:
                                        ;  * `initiation` -- sent / received by a peer in order to establish a session; (either independent or nested;)
                                        ;  * `exchange` -- sent / received by any peer during the life-time of the session;
                                        ;  * `termination` -- sent / received by any peer to terminate the session both ways;
                                        ;   (no further message is allowed in any direction after this;)
                                        ; The payload is application related and has no other meaning for the protocol specification (except encoding).
                                ((name access-request) (type initiation))
                                ((name access-accept) (type exchange))
                                ((name access-reject) (type termination))
                                ((name select-request) (type initiation) (payload select-request))
                                ((name close) (type termination))
                                ((name abort) (type termination)))
                        (exchanges
                                (
                                        (role client)
                                        (states
                                                        ; Each session should have at least one transition from an `initial` state; at least one transition
                                                        ; to a `terminal` state; the `intermediary` states could be missing.
                                                        ; When a session is created it is automatically put in one of these `initial` states.
                                                (initial created)
                                                (intermediary access-pending ready)
                                                (terminal closed aborted))
                                        (transitions
                                                ((name access-requesting) (when created) (next access-pending)
                                                        (send access-request))
                                                ((name access-accepted) (when access-pending) (next ready)
                                                        (receive access-accept))
                                                ((name access-rejected) (when access-pending) (next closed)
                                                        (receive access-reject))
                                                ((name disconnected) (when ready) (next closed)
                                                        (send close))
                                                ((name select-requesting) (when ready)
                                                        (send select-request)
                                                                ; The `select` is one example of an operation which is does not follow the `request/reply` pattern;
                                                                ; thus we create a nested session of type `select-operation` placing us in the `client` role,
                                                                ; with the initial state `pending`.
                                                        (spawn select-operation (role client) (state pending))))
                                                ((name self-aborted) (when access-pending ready) (next aborted)
                                                        (send abort))
                                                ((name peer-aborted) (when access-pending ready) (next aborted)
                                                        (receive abort)))
                                (
                                        (role server)
                                        (states
                                                (initial created)
                                                (intermediary access-pending ready)
                                                (terminal closed aborted))
                                        (transitions
                                                ((name access-requested) (when created) (next access-pending)
                                                        (receive access-request))
                                                ((name access-accepted) (when access-pending) (next accessing)
                                                        (send access-accept))
                                                ((name access-rejected) (when access-pending) (next closed)
                                                        (send access-reject))
                                                ((name disconnected) (when ready) (next closed)
                                                        (receive close))
                                                ((name select-requested) (when ready)
                                                        (receive select-request)
                                                        (spawn select-operation (role server) (state pending)))
                                                ((name self-aborted) (when access-pending ready) (next aborted)
                                                        (send abort))
                                                ((name peer-aborted) (when access-pending ready) (next aborted)
                                                        (receive abort))))))
                (
                        (name select-operation)
                        (identifier "089247c8-60ca-46bd-9ddc-303907c317a1")
                        (type nested)
                        (parent connection)
                        (roles client server)
                        (messages
                                ((name select-metadata) (type exchange) (payload select-metadata))
                                ((name select-stream-begin) (type exchange))
                                ((name select-stream-chunk) (type exchange) (payload select-stream-chunk))
                                ((name select-stream-end) (type termination))
                                ((name close) (type termination)))
                                ((name abort) (type termination)))
                        (exchanges
                                ; This `select` operation has the following life-cycle:
                                ;  * initially the client sends the `select` (`get`) request, waiting for the meta-data of the associated value;
                                ;  * if the client only wanted to access the meta-data (like size, replicas, etc.) it finishes the operation here;
                                ;  * if however the client also wants to receive the data in requests the beginning of the data stream;
                                ;  * following, as the associated data could be several GB in size, the server dosen't send it all in one message,
                                ;   but sends multiple chunks to the client as separate messages;
                                ;  * when the server has no data to send it ends the stream with a special message;
                                ; Observations:
                                ;  * we could have added some options inside the `select-stream-begin` message to specify the part of the
                                ;   entire data we are interested in; (like HTTP chunked requests;)
                                (
                                        (role client)
                                        (states
                                                (initial metadata-pending)
                                                (intermediary stream-pending stream-receiving)
                                                (terminal closed aborted))
                                        (transitions
                                                ((name metadata-received) (when metadata-pending) (next stream-pending)
                                                        (receive select-metadata))
                                                ((name stream-beginning) (when stream-pending) (next stream-receiving)
                                                        (send select-stream-begin))
                                                ((name closed) (when stream-pending) (next closed)
                                                        (send close))
                                                ((name stream-received) (when stream-receiving)
                                                        (receive select-stream-chunk))
                                                ((name stream-ended) (when data-receiving) (next closed)
                                                        (receive select-end))
                                                ((name operation-aborting) (when metadata-pending stream-pending stream-receiving) (next aborted)
                                                        (send abort))
                                                ((name operation-aborted) (when metadata-pending stream-pending stream-receiving) (next aborted)
                                                        (receive abort))))
                                (
                                        (role server)
                                        (states
                                                (initial metadata-pending)
                                                (intermediary stream-pending stream-sending)
                                                (terminal closed aborted))
                                        (transitions
                                                ((name metadata-sent) (when metadata-pending) (next stream-pending)
                                                        (send select-metadata))
                                                ((name stream-beginning) (when stream-pending) (next stream-sending)
                                                        (receive select-stream-begin))
                                                ((name closed) (when stream-pending) (next closed)
                                                        (receive close))
                                                ((name stream-sent) (when stream-sending)
                                                        (send select-stream-chunk))
                                                ((name stream-ended) (when stream-sending) (next closed)
                                                        (send select-stream-end))
                                                ((name operation-aborting) (when metadata-pending stream-pending stream-receiving) (next aborted)
                                                        (send abort))
                                                ((name operation-aborted) (when metadata-pending stream-pending stream-receiving) (next aborted)
                                                        (receive abort))))))
        (payloads
                (
                        (name select-request)
                        (identifier "0a320756-b799-47dd-9455-685ca610ae62")
                        (encoding protocol-buffers)
                        (specification
                                (message "SelectRequest")
                                (proto-file "key-value-store-select.proto")))
                (
                        (name select-metadata)
                        (identifier "d8b9be74-27b5-48c8-910c-6a615b88c7a3")
                        (encoding protocol-buffers)
                        (specification
                                (message "SelectMetaData")
                                (proto-file "key-value-store-select.proto")))
                (
                        (name select-stream-chunk)
                        (identifier "5e8b3965-b82c-40cb-b832-5ea8f8e3ea97")
                        (encoding protocol-buffers)
                        (specification
                                (message "SelectStreamChunk")
                                (proto-file "key-value-store-select.proto"))))
)

Technologies

MessagePack

request ::= [type, msgid, method, params]
reply ::= [type, msgid, error, result]
notification ::= [type, method, params]

type ::= 0 # request
     ::= 1 # response
     ::= 2 # notification

msgid ::= 32-unsigned
method ::= string
params ::= array
error, result ::= object | nil

BERT

-> {call, Module, Function, Arguments}
<- {reply, Result}
<- {error, {Type, Code, Class, Detail, Backtrace}}

-> {cast, Module, Function, Arguments}
<- {noreply}

<> {info, Command, Options}

AMF

amf-packet ::= version header-count *(header-type) message-count *(message-type)
header-type ::= header-name must-understand header-length value-type
message-type ::= target-uri response-uri message-length value-type

header-count, message-count, version ::= U16
must-understand ::= U8
header-length, message-length ::= U32 # -1 if unknown
header-name, target-uri, response-uri ::= UTF-8

Hessian

top     ::= call
        ::= replycall
        ::= c x01 x00 header* methodobject* z

reply   ::= r x01 x00 header* object z
        ::= r x01 x00 header* fault z

fault   ::= f (objectobject)* z

Sun RPC

struct rpc_msg {
    unsigned int xid;
    union switch (msg_type mtype) {
        case CALL:
             call_body cbody;
        case REPLY:
             reply_body rbody;
    } body;
};

struct call_body {
    unsigned int rpcvers;       /* must be equal to two (2) */
    unsigned int prog;
    unsigned int vers;
    unsigned int proc;
    opaque_auth cred;
    opaque_auth verf;
    /* procedure-specific parameters start here */
};

union reply_body switch (reply_stat stat) {
    case MSG_ACCEPTED:
        accepted_reply areply;
    case MSG_DENIED:
        rejected_reply rreply;
};

struct accepted_reply {
    opaque_auth verf;
    union switch (accept_stat stat) {
        case SUCCESS:
            opaque results[0];
            /*
             * procedure-specific results start here
             */
        case PROG_MISMATCH:
            struct {
                unsigned int low;
                unsigned int high;
            } mismatch_info;
        default:
            /*
             * Void.  Cases include PROG_UNAVAIL, PROC_UNAVAIL,
             * GARBAGE_ARGS, and SYSTEM_ERR.
             */
            void;
    } reply_data;
};

union rejected_reply switch (reject_stat stat) {
    case RPC_MISMATCH:
        struct {
            unsigned int low;
            unsigned int high;
        } mismatch_info;
    case AUTH_ERROR:
        auth_stat stat;
};

Thrift

JSON-RPC

Request ::= {
    jsonrpc : "2.0",
    id : Number | String,
    method : String,
    [params : Structured,]
}

Response ::= {
    jsonrpc : "2.0",
    id : Number | String,
    [result : Any | error : Error,]
}

Error ::= {
    code : Number,
    message : String,
    [data : Any,]
}

Notification ::= {
    jsonrpc : "2.0",
    method : String,
    [params : Structured,]
}

BatchRequest ::= [Request | Notification, ...]
BatchResponse ::= [Response, ...]

XML-RPC

<methodCall>
    <methodName>String</methodName>
    <params (?)>
        <param (*)><value>(value)</value></param>
    </params>
</methodCall>

<methodResponse>
    <params>
        <param><value>(value)</value></param>
    </params>
</methodResponse>

<methodResponse>
    <fault>
        <value>(...)</value>
    </fault>
</methodResponse>

SOAP

<Envelope>
    <Header (?)>
        <(qname) role=(uri)(?) mustUnderstand=(boolean)(?) relay=(boolean)(?) (*)>...</(qname)>
    </Header>
</Envelope>

<Fault>
    <(code::=) Code>
        <Value>(qname)</Value>
        <Subcode (?)>(code)</Subcode>
    </Code>
    <Reason><Text (+)>(text)</Text></Reason>
    <Node (?)>(uri)</Node>
    <Role (?)>(uri)</Role>
    <Detail (?)>(any)</Detail>
</Fault>

Miscellaneous references