From c69b3ddd2acb4c3f33211e1aa488490963e38ba8 Mon Sep 17 00:00:00 2001 From: Matthias Kruk Date: Sat, 26 Jun 2021 19:10:33 +0900 Subject: [PATCH] include/ipc: Add publish-subscribe channel implementation This commit adds a naive queue-based pubsub implementation that allows scripts to send messages to multiple receivers at a time. --- include/ipc.sh | 86 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/include/ipc.sh b/include/ipc.sh index faab575..9f9802c 100644 --- a/include/ipc.sh +++ b/include/ipc.sh @@ -9,6 +9,8 @@ __init() { declare -gxr __ipc_public="$__ipc_root/pub" declare -gxr __ipc_private="$__ipc_root/priv/$USER" declare -gxr __ipc_group="toolbox_ipc" + declare -gxr __ipc_self="$HOSTNAME.$$" + declare -gxr __ipc_pubsub_root="$__ipc_root/pubsub" declare -gxir __ipc_version=1 @@ -606,3 +608,87 @@ ipc_endpoint_recv() { return 1 } + +_ipc_pubsub_create_topic() { + local topic="$1" + + local pubsub + + pubsub="$__ipc_pubsub_root/$topic" + + if ! mkdir -p "$pubsub"; then + return 1 + fi + + return 0 +} + +_ipc_pubsub_subscribe() { + local topic="$1" + + local sub + + sub="$__ipc_pubsub_root/$topic/$__ipc_self" + + if ! [ -e "$sub" ] && ! queue_init "$sub"; then + return 1 + fi + + return 0 +} + +ipc_pubsub_recv() { + local topic="$1" + local -i timeout="$2" + + local queue + + if (( $# < 2 )); then + timeout=-1 + fi + + if ! _ipc_pubsub_create_topic "$topic"; then + return 1 + + elif ! _ipc_pubsub_subscribe "$topic"; then + return 1 + + elif ! queue_get "$__ipc_pubsub_root/$topic/$__ipc_self" "$timeout"; then + return 1 + fi + + return 0 +} + +_ipc_pubsub_get_subscribers() { + local topic="$1" + + local subscriber + + while read -r subscriber; do + local normalized + + if normalized=$(realpath "$subscriber"); then + echo "$normalized" + fi + done < <(find "$__ipc_pubsub_root/$topic" -mindepth 1 -maxdepth 1 -type d) + + return 0 +} + +ipc_pubsub_send() { + local topic="$1" + local msg="$2" + + local subscriber + + if ! _ipc_pubsub_create_topic "$topic"; then + return 1 + fi + + while read -r subscriber; do + queue_put "$subscriber" "$msg" + done < <(_ipc_pubsub_get_subscribers "$topic") + + return 0 +} -- 2.47.3