From: Matthias Kruk Date: Sun, 27 Jun 2021 01:35:41 +0000 (+0900) Subject: include/ipc: Use endpoints to implement publish-subscribe functions X-Git-Url: https://git.corax.cc/?a=commitdiff_plain;h=e70dd28aaac34439b889a17c0ad9d24178bfc969;p=toolbox include/ipc: Use endpoints to implement publish-subscribe functions The publish-subscribe functions added by the previous commit are independent of the endpoint API, making it necessary to separately implement authentication and message encapsulation for pubsub messaging. This commit moves the pubsub functionality into the endpoint API, making it unnecessary to re-implement message authentication and encapsulation. --- diff --git a/include/ipc.sh b/include/ipc.sh index 9f9802c..7515bad 100644 --- a/include/ipc.sh +++ b/include/ipc.sh @@ -468,7 +468,7 @@ ipc_endpoint_open() { endpoint="$__ipc_root/$name" if ! [ -d "$endpoint" ]; then - if ! mkdir -p "$endpoint"; then + if ! mkdir -p "$endpoint/subscriptions"; then return 1 fi @@ -490,6 +490,7 @@ ipc_endpoint_close() { local name="$1" local endpoint + local subscription endpoint="$__ipc_root/$name" @@ -497,6 +498,12 @@ ipc_endpoint_close() { return 1 fi + while read -r subscription; do + if ! rm "$subscription/$__ipc_self"; then + log_error "Could unsubscribe $name from $subscription" + fi + done < <(find "$endpoint/subscriptions") + if ! rm -rf "$endpoint"; then return 1 fi @@ -609,86 +616,83 @@ ipc_endpoint_recv() { return 1 } -_ipc_pubsub_create_topic() { +_ipc_endpoint_topic_create() { local topic="$1" - local pubsub - - pubsub="$__ipc_pubsub_root/$topic" - - if ! mkdir -p "$pubsub"; then + if ! mkdir -p "$__ipc_pubsub_root/$topic"; then return 1 fi return 0 } -_ipc_pubsub_subscribe() { - local topic="$1" +_ipc_endpoint_topic_subscribe() { + local endpoint="$1" + local topic="$2" + + local topicdir - local sub + topicdir="$__ipc_pubsub_root/$topic" - sub="$__ipc_pubsub_root/$topic/$__ipc_self" + if ! ln -sf "$endpoint" "$topicdir/$__ipc_self"; then + return 1 + fi - if ! [ -e "$sub" ] && ! queue_init "$sub"; then + if ! ln -sf "$topicdir" "$__ipc_root/$endpoint/subscriptions/$topic"; then + rm -f "$topicdir/$__ipc_self" return 1 fi return 0 } -ipc_pubsub_recv() { +_ipc_endpoint_topic_get_subscribers() { local topic="$1" - local -i timeout="$2" - - local queue - if (( $# < 2 )); then - timeout=-1 - fi + local subscription - if ! _ipc_pubsub_create_topic "$topic"; then - return 1 + while read -r subscription; do + local subscriber - elif ! _ipc_pubsub_subscribe "$topic"; then - return 1 + if ! subscriber=$(readlink "$subscription"); then + continue + fi - elif ! queue_get "$__ipc_pubsub_root/$topic/$__ipc_self" "$timeout"; then - return 1 - fi + echo "$subscriber" + done < <(find "$__ipc_pubsub_root/$topic" -mindepth 1 -maxdepth 1 -type l) return 0 } -_ipc_pubsub_get_subscribers() { - local topic="$1" - - local subscriber +ipc_endpoint_subscribe() { + local endpoint="$1" + local topic="$2" - while read -r subscriber; do - local normalized + if ! _ipc_endpoint_topic_create "$topic"; then + return 1 + fi - if normalized=$(realpath "$subscriber"); then - echo "$normalized" - fi - done < <(find "$__ipc_pubsub_root/$topic" -mindepth 1 -maxdepth 1 -type d) + if ! _ipc_endpoint_topic_subscribe "$endpoint" "$topic"; then + return 1 + fi return 0 } -ipc_pubsub_send() { - local topic="$1" - local msg="$2" +ipc_endpoint_publish() { + local endpoint="$1" + local topic="$2" + local message="$3" local subscriber - if ! _ipc_pubsub_create_topic "$topic"; then + if ! _ipc_endpoint_topic_create "$topic"; then return 1 fi while read -r subscriber; do - queue_put "$subscriber" "$msg" - done < <(_ipc_pubsub_get_subscribers "$topic") + ipc_endpoint_send "$endpoint" "$subscriber" "$message" + done < <(_ipc_endpoint_topic_get_subscribers "$topic") return 0 }