]> git.corax.cc Git - toolbox/commitdiff
include/ipc: Use endpoints to implement publish-subscribe functions
authorMatthias Kruk <m@m10k.eu>
Sun, 27 Jun 2021 01:35:41 +0000 (10:35 +0900)
committerMatthias Kruk <m@m10k.eu>
Sun, 27 Jun 2021 01:35:41 +0000 (10:35 +0900)
The publish-subscribe functions added by the previous commit are
independent of the endpoint API, making it necessary to separately
implement authentication and message encapsulation for pubsub
messaging.
This commit moves the pubsub functionality into the endpoint API,
making it unnecessary to re-implement message authentication and
encapsulation.

include/ipc.sh

index 9f9802cc5b46db9a88d37537d9dd6fce699a35e3..7515bad83b8f132316e0fb070ff019708cc24dca 100644 (file)
@@ -468,7 +468,7 @@ ipc_endpoint_open() {
        endpoint="$__ipc_root/$name"
 
        if ! [ -d "$endpoint" ]; then
-               if ! mkdir -p "$endpoint"; then
+               if ! mkdir -p "$endpoint/subscriptions"; then
                        return 1
                fi
 
@@ -490,6 +490,7 @@ ipc_endpoint_close() {
        local name="$1"
 
        local endpoint
+       local subscription
 
        endpoint="$__ipc_root/$name"
 
@@ -497,6 +498,12 @@ ipc_endpoint_close() {
                return 1
        fi
 
+       while read -r subscription; do
+               if ! rm "$subscription/$__ipc_self"; then
+                       log_error "Could unsubscribe $name from $subscription"
+               fi
+       done < <(find "$endpoint/subscriptions")
+
        if ! rm -rf "$endpoint"; then
                return 1
        fi
@@ -609,86 +616,83 @@ ipc_endpoint_recv() {
        return 1
 }
 
-_ipc_pubsub_create_topic() {
+_ipc_endpoint_topic_create() {
        local topic="$1"
 
-       local pubsub
-
-       pubsub="$__ipc_pubsub_root/$topic"
-
-       if ! mkdir -p "$pubsub"; then
+       if ! mkdir -p "$__ipc_pubsub_root/$topic"; then
                return 1
        fi
 
        return 0
 }
 
-_ipc_pubsub_subscribe() {
-       local topic="$1"
+_ipc_endpoint_topic_subscribe() {
+       local endpoint="$1"
+       local topic="$2"
+
+       local topicdir
 
-       local sub
+       topicdir="$__ipc_pubsub_root/$topic"
 
-       sub="$__ipc_pubsub_root/$topic/$__ipc_self"
+       if ! ln -sf "$endpoint" "$topicdir/$__ipc_self"; then
+               return 1
+       fi
 
-       if ! [ -e "$sub" ] && ! queue_init "$sub"; then
+       if ! ln -sf "$topicdir" "$__ipc_root/$endpoint/subscriptions/$topic"; then
+               rm -f "$topicdir/$__ipc_self"
                return 1
        fi
 
        return 0
 }
 
-ipc_pubsub_recv() {
+_ipc_endpoint_topic_get_subscribers() {
        local topic="$1"
-       local -i timeout="$2"
-
-       local queue
 
-       if (( $# < 2 )); then
-               timeout=-1
-       fi
+       local subscription
 
-       if ! _ipc_pubsub_create_topic "$topic"; then
-               return 1
+       while read -r subscription; do
+               local subscriber
 
-       elif ! _ipc_pubsub_subscribe "$topic"; then
-               return 1
+               if ! subscriber=$(readlink "$subscription"); then
+                       continue
+               fi
 
-       elif ! queue_get "$__ipc_pubsub_root/$topic/$__ipc_self" "$timeout"; then
-               return 1
-       fi
+               echo "$subscriber"
+       done < <(find "$__ipc_pubsub_root/$topic" -mindepth 1 -maxdepth 1 -type l)
 
        return 0
 }
 
-_ipc_pubsub_get_subscribers() {
-       local topic="$1"
-
-       local subscriber
+ipc_endpoint_subscribe() {
+       local endpoint="$1"
+       local topic="$2"
 
-       while read -r subscriber; do
-               local normalized
+       if ! _ipc_endpoint_topic_create "$topic"; then
+               return 1
+       fi
 
-               if normalized=$(realpath "$subscriber"); then
-                       echo "$normalized"
-               fi
-       done < <(find "$__ipc_pubsub_root/$topic" -mindepth 1 -maxdepth 1 -type d)
+       if ! _ipc_endpoint_topic_subscribe "$endpoint" "$topic"; then
+               return 1
+       fi
 
        return 0
 }
 
-ipc_pubsub_send() {
-       local topic="$1"
-       local msg="$2"
+ipc_endpoint_publish() {
+       local endpoint="$1"
+       local topic="$2"
+       local message="$3"
 
        local subscriber
 
-       if ! _ipc_pubsub_create_topic "$topic"; then
+       if ! _ipc_endpoint_topic_create "$topic"; then
                return 1
        fi
 
        while read -r subscriber; do
-               queue_put "$subscriber" "$msg"
-       done < <(_ipc_pubsub_get_subscribers "$topic")
+               ipc_endpoint_send "$endpoint" "$subscriber" "$message"
+       done < <(_ipc_endpoint_topic_get_subscribers "$topic")
 
        return 0
 }