From: Matthias Kruk Date: Wed, 14 Apr 2021 23:52:21 +0000 (+0900) Subject: include/queue: Add functions for file queues and duplicate-free queues X-Git-Url: https://git.corax.cc/?a=commitdiff_plain;h=887bfe57b2f0e66626f727c2eb8f5a5925bab403;p=toolbox include/queue: Add functions for file queues and duplicate-free queues For certain automation tasks, it would be helpful if filesystem objects could be passed through queues. This commit adds such functionality to the queue module. For passing files, these functions can be used: - queue_put_file() - queue_get_file() For duplicate-free transient data: - queue_put_unique() - queue_get() For other transient data: - queue_put() - queue_get() There are two things worth noting: 1. The queues are line-based (transient data may not contain newlines) 2. Queues cannot be used for transient data and files at the same time --- diff --git a/include/queue.sh b/include/queue.sh index ad6cedf..c2a94e3 100644 --- a/include/queue.sh +++ b/include/queue.sh @@ -121,6 +121,14 @@ _queue_get_data() { echo "$path/data" } +_queue_get_filedir() { + local queue="$1" + + local path + + path=$(_queue_get_path "$queue") + echo "$path/files" +} queue_init() { local queue="$1" @@ -195,6 +203,158 @@ queue_put() { return "$err" } +_queue_contains() { + local queue="$1" + local item="$2" + + local data + local qdata + + data=$(_queue_get_data "$queue") + + while read -r qdata; do + if [[ "$qdata" == "$data" ]]; then + return 0 + fi + done + + return 1 +} + +queue_put_unique() { + local queue="$1" + local item="$2" + + local mutex + local sem + local data + local err + + # When this function returns success, the caller can be sure that + # the item is in the queue. However, this includes the case that + # it was already in the queue. Since the item is transient data, + # this behavior seems appropriate. Files are a different story. + + mutex=$(_queue_get_mutex "$queue") + sem=$(_queue_get_sem "$queue") + data=$(_queue_get_data "$queue") + + mutex_lock "$mutex" + + if _queue_contains "$queue" "$item"; then + err=-1 + else + if ! echo "$item" >> "$data"; then + err=1 + else + err=0 + fi + fi + + mutex_unlock "$mutex" + + if (( err == 0 )); then + if ! sem_post "$sem"; then + err=1 + fi + elif (( err < 0 )); then + err=0 + fi + + return "$err" +} + +_queue_move_to_q() { + local queue="$1" + local filepath="$2" + + local filedir + local filename + local data + local dest + + filedir=$(_queue_get_filedir "$queue") + data=$(_queue_get_data "$queue") + + filename="${filepath##*/}" + dest="$filedir/$filename" + + if ! cp -a "$filepath" "$dest"; then + return 1 + fi + + if ! echo "$dest" >> "$data"; then + log_error "Could not append to queue: $data" + + if ! rm -rf "$dest"; then + log_error "Could not remove file from queue: $dest" + fi + + return 1 + fi + + if ! rm -rf "$filepath"; then + log_error "Could not remove source file: $filepath" + fi + + return 0 +} + +queue_put_file() { + local queue="$1" + local filepath="$2" + + local mutex + local sem + local filedir + local filename + local data + local err + + # Unlike queue_put_unique(), this function returns failure if the + # file was already in the queue. The file queue does not allow + # duplicates because files would be overwritten. + + filename="${filepath##*/}" + + mutex=$(_queue_get_mutex "$queue") + filedir=$(_queue_get_filedir "$queue") + sem=$(_queue_get_sem "$queue") + + mutex_lock "$mutex" + + if ! mkdir -p "$filedir" &> /dev/null; then + err=1 + else + local dest + + dest="$filedir/$filename" + + if [ -e "$filedir/$filename" ]; then + # Must not succeed if the file was already in the queue + err=1 + else + if _queue_move_to_q "$queue" "$filepath"; then + err=0 + else + err=1 + fi + fi + fi + + mutex_unlock "$mutex" + + if (( err == 0 )); then + if ! sem_post "$sem"; then + err=1 + fi + elif (( err < 0 )); then + err=0 + fi + + return "$err" +} + queue_get() { local queue="$1" @@ -233,3 +393,63 @@ queue_get() { echo "$item" return 0 } + +queue_get_file() { + local queue="$1" + local destdir="$2" + + local sem + local mutex + local data + local item + local dest + local err + + if ! [ -d "$destdir" ]; then + log_error "Destination must be a directory" + return 1 + fi + + sem=$(_queue_get_sem "$queue") + mutex=$(_queue_get_mutex "$queue") + data=$(_queue_get_data "$queue") + + err=false + + if ! sem_wait "$sem"; then + return 1 + fi + + mutex_lock "$mutex" + + if ! item=$(head -n 1 "$data" 2>/dev/null); then + err=true + else + dest="$destdir/${item##*/}" + + if ! sed -i '1d' "$data" 2>/dev/null; then + log_error "Could not remove item from $data" + err=true + else + log_debug "Moving $item to $dest" + + if ! mv "$item" "$dest"; then + log_error "Could not move $item to $dest" + + if ! sed -i "1s|^|$item\n|" "$data"; then + log_error "Could not put item back in queue" + fi + err=true + fi + fi + fi + + mutex_unlock "$mutex" + + if "$err"; then + return 1 + fi + + echo "$dest" + return 0 +}