]> git.corax.cc Git - toolbox/commitdiff
include/ipc: Add publish-subscribe channel implementation
authorMatthias Kruk <m@m10k.eu>
Sat, 26 Jun 2021 10:10:33 +0000 (19:10 +0900)
committerMatthias Kruk <m@m10k.eu>
Sat, 26 Jun 2021 10:10:33 +0000 (19:10 +0900)
This commit adds a naive queue-based pubsub implementation that allows
scripts to send messages to multiple receivers at a time.

include/ipc.sh

index faab5758213ec57daf4776ee6fc103a3e881ebff..9f9802cc5b46db9a88d37537d9dd6fce699a35e3 100644 (file)
@@ -9,6 +9,8 @@ __init() {
        declare -gxr  __ipc_public="$__ipc_root/pub"
        declare -gxr  __ipc_private="$__ipc_root/priv/$USER"
        declare -gxr  __ipc_group="toolbox_ipc"
+       declare -gxr  __ipc_self="$HOSTNAME.$$"
+       declare -gxr  __ipc_pubsub_root="$__ipc_root/pubsub"
 
        declare -gxir __ipc_version=1
 
@@ -606,3 +608,87 @@ ipc_endpoint_recv() {
 
        return 1
 }
+
+_ipc_pubsub_create_topic() {
+       local topic="$1"
+
+       local pubsub
+
+       pubsub="$__ipc_pubsub_root/$topic"
+
+       if ! mkdir -p "$pubsub"; then
+               return 1
+       fi
+
+       return 0
+}
+
+_ipc_pubsub_subscribe() {
+       local topic="$1"
+
+       local sub
+
+       sub="$__ipc_pubsub_root/$topic/$__ipc_self"
+
+       if ! [ -e "$sub" ] && ! queue_init "$sub"; then
+               return 1
+       fi
+
+       return 0
+}
+
+ipc_pubsub_recv() {
+       local topic="$1"
+       local -i timeout="$2"
+
+       local queue
+
+       if (( $# < 2 )); then
+               timeout=-1
+       fi
+
+       if ! _ipc_pubsub_create_topic "$topic"; then
+               return 1
+
+       elif ! _ipc_pubsub_subscribe "$topic"; then
+               return 1
+
+       elif ! queue_get "$__ipc_pubsub_root/$topic/$__ipc_self" "$timeout"; then
+               return 1
+       fi
+
+       return 0
+}
+
+_ipc_pubsub_get_subscribers() {
+       local topic="$1"
+
+       local subscriber
+
+       while read -r subscriber; do
+               local normalized
+
+               if normalized=$(realpath "$subscriber"); then
+                       echo "$normalized"
+               fi
+       done < <(find "$__ipc_pubsub_root/$topic" -mindepth 1 -maxdepth 1 -type d)
+
+       return 0
+}
+
+ipc_pubsub_send() {
+       local topic="$1"
+       local msg="$2"
+
+       local subscriber
+
+       if ! _ipc_pubsub_create_topic "$topic"; then
+               return 1
+       fi
+
+       while read -r subscriber; do
+               queue_put "$subscriber" "$msg"
+       done < <(_ipc_pubsub_get_subscribers "$topic")
+
+       return 0
+}