return 0
}
-ipc_msg_new() {
+_ipc_msg_new() {
local source="$1"
local destination="$2"
local data_raw="$3"
}
ipc_endpoint_send() {
- local endpoint="$1"
- local msg="$2"
+ local source="$1"
+ local destination="$2"
+ local data="$3"
+
+ local msg
+
+ if ! msg=$(_ipc_msg_new "$source" "$destination" "$data"); then
+ return 1
+ fi
- if ! _ipc_endpoint_put "$endpoint" "$msg"; then
+ if ! _ipc_endpoint_put "$destination" "$msg"; then
return 1
fi
local endpoint="$1"
local -i timeout="$2"
- local msg
+ local -i start
- if ! msg=$(_ipc_endpoint_get "$endpoint" "$timeout"); then
- return 1
+ if (( $# < 2 )); then
+ timeout=-1
fi
- echo "$msg"
- return 0
+ if ! start=$(date +"%s"); then
+ return 2
+ fi
+
+ while true; do
+ local msg
+ local -i elapsed
+ local -i remaining
+
+ remaining="$timeout"
+
+ if (( timeout > 0 )); then
+ local now
+
+ if ! now=$(date +"%s"); then
+ return 2
+ fi
+
+ elapsed=$((now - start))
+ remaining=$((timeout - elapsed))
+
+ # Remaining must not be negative because _ipc_endpoint_get() takes
+ # that to mean "block (possibly forever) until a message arrives"
+ if (( remaining < 0 )); then
+ remaining=0
+ fi
+ fi
+
+ if msg=$(_ipc_endpoint_get "$endpoint" "$remaining"); then
+ if _ipc_msg_validate "$msg"; then
+ echo "$msg"
+ return 0
+ fi
+
+ log_info "Dropping invalid message on $endpoint"
+ fi
+
+ if (( remaining == 0 )); then
+ break
+ fi
+ done
+
+ return 1
}