declare -gxr __ipc_public="$__ipc_root/pub"
declare -gxr __ipc_private="$__ipc_root/priv/$USER"
declare -gxr __ipc_group="toolbox_ipc"
+ declare -gxr __ipc_self="$HOSTNAME.$$"
+ declare -gxr __ipc_pubsub_root="$__ipc_root/pubsub"
declare -gxir __ipc_version=1
return 1
}
+
+_ipc_pubsub_create_topic() {
+ local topic="$1"
+
+ local pubsub
+
+ pubsub="$__ipc_pubsub_root/$topic"
+
+ if ! mkdir -p "$pubsub"; then
+ return 1
+ fi
+
+ return 0
+}
+
+_ipc_pubsub_subscribe() {
+ local topic="$1"
+
+ local sub
+
+ sub="$__ipc_pubsub_root/$topic/$__ipc_self"
+
+ if ! [ -e "$sub" ] && ! queue_init "$sub"; then
+ return 1
+ fi
+
+ return 0
+}
+
+ipc_pubsub_recv() {
+ local topic="$1"
+ local -i timeout="$2"
+
+ local queue
+
+ if (( $# < 2 )); then
+ timeout=-1
+ fi
+
+ if ! _ipc_pubsub_create_topic "$topic"; then
+ return 1
+
+ elif ! _ipc_pubsub_subscribe "$topic"; then
+ return 1
+
+ elif ! queue_get "$__ipc_pubsub_root/$topic/$__ipc_self" "$timeout"; then
+ return 1
+ fi
+
+ return 0
+}
+
+_ipc_pubsub_get_subscribers() {
+ local topic="$1"
+
+ local subscriber
+
+ while read -r subscriber; do
+ local normalized
+
+ if normalized=$(realpath "$subscriber"); then
+ echo "$normalized"
+ fi
+ done < <(find "$__ipc_pubsub_root/$topic" -mindepth 1 -maxdepth 1 -type d)
+
+ return 0
+}
+
+ipc_pubsub_send() {
+ local topic="$1"
+ local msg="$2"
+
+ local subscriber
+
+ if ! _ipc_pubsub_create_topic "$topic"; then
+ return 1
+ fi
+
+ while read -r subscriber; do
+ queue_put "$subscriber" "$msg"
+ done < <(_ipc_pubsub_get_subscribers "$topic")
+
+ return 0
+}