endpoint="$__ipc_root/$name"
if ! [ -d "$endpoint" ]; then
- if ! mkdir -p "$endpoint"; then
+ if ! mkdir -p "$endpoint/subscriptions"; then
return 1
fi
local name="$1"
local endpoint
+ local subscription
endpoint="$__ipc_root/$name"
return 1
fi
+ while read -r subscription; do
+ if ! rm "$subscription/$__ipc_self"; then
+ log_error "Could unsubscribe $name from $subscription"
+ fi
+ done < <(find "$endpoint/subscriptions")
+
if ! rm -rf "$endpoint"; then
return 1
fi
return 1
}
-_ipc_pubsub_create_topic() {
+_ipc_endpoint_topic_create() {
local topic="$1"
- local pubsub
-
- pubsub="$__ipc_pubsub_root/$topic"
-
- if ! mkdir -p "$pubsub"; then
+ if ! mkdir -p "$__ipc_pubsub_root/$topic"; then
return 1
fi
return 0
}
-_ipc_pubsub_subscribe() {
- local topic="$1"
+_ipc_endpoint_topic_subscribe() {
+ local endpoint="$1"
+ local topic="$2"
+
+ local topicdir
- local sub
+ topicdir="$__ipc_pubsub_root/$topic"
- sub="$__ipc_pubsub_root/$topic/$__ipc_self"
+ if ! ln -sf "$endpoint" "$topicdir/$__ipc_self"; then
+ return 1
+ fi
- if ! [ -e "$sub" ] && ! queue_init "$sub"; then
+ if ! ln -sf "$topicdir" "$__ipc_root/$endpoint/subscriptions/$topic"; then
+ rm -f "$topicdir/$__ipc_self"
return 1
fi
return 0
}
-ipc_pubsub_recv() {
+_ipc_endpoint_topic_get_subscribers() {
local topic="$1"
- local -i timeout="$2"
-
- local queue
- if (( $# < 2 )); then
- timeout=-1
- fi
+ local subscription
- if ! _ipc_pubsub_create_topic "$topic"; then
- return 1
+ while read -r subscription; do
+ local subscriber
- elif ! _ipc_pubsub_subscribe "$topic"; then
- return 1
+ if ! subscriber=$(readlink "$subscription"); then
+ continue
+ fi
- elif ! queue_get "$__ipc_pubsub_root/$topic/$__ipc_self" "$timeout"; then
- return 1
- fi
+ echo "$subscriber"
+ done < <(find "$__ipc_pubsub_root/$topic" -mindepth 1 -maxdepth 1 -type l)
return 0
}
-_ipc_pubsub_get_subscribers() {
- local topic="$1"
-
- local subscriber
+ipc_endpoint_subscribe() {
+ local endpoint="$1"
+ local topic="$2"
- while read -r subscriber; do
- local normalized
+ if ! _ipc_endpoint_topic_create "$topic"; then
+ return 1
+ fi
- if normalized=$(realpath "$subscriber"); then
- echo "$normalized"
- fi
- done < <(find "$__ipc_pubsub_root/$topic" -mindepth 1 -maxdepth 1 -type d)
+ if ! _ipc_endpoint_topic_subscribe "$endpoint" "$topic"; then
+ return 1
+ fi
return 0
}
-ipc_pubsub_send() {
- local topic="$1"
- local msg="$2"
+ipc_endpoint_publish() {
+ local endpoint="$1"
+ local topic="$2"
+ local message="$3"
local subscriber
- if ! _ipc_pubsub_create_topic "$topic"; then
+ if ! _ipc_endpoint_topic_create "$topic"; then
return 1
fi
while read -r subscriber; do
- queue_put "$subscriber" "$msg"
- done < <(_ipc_pubsub_get_subscribers "$topic")
+ ipc_endpoint_send "$endpoint" "$subscriber" "$message"
+ done < <(_ipc_endpoint_topic_get_subscribers "$topic")
return 0
}