Лейка

Год назад, после недолгого изучения Python и Common Lisp я взялся за Clojure. Примерно в то же время я открыл для себя местный файлообменный сайт, ныне опустившийся до непотребного в плане рекламы состояния, но все еще приносящий пользу, благо на нем обитает немало народу со всего дальнего востока. На этом сайте любой желающий может опубликовать свое сообщение — набор из картинок, текста описания и ссылок на залитые на файлообменник файлы. Часто среди опубликованного попадаются действительно стоящие вещи. И в большинстве случаев это добрая пачка ссылок на файлы общим весом в несколько гигабайт. Естественно, качать ручками из браузера и караулить не сломалось ли чего по ходу дела — не мой метод.

Ubuntu — моя домашняя система. В сообществе файлообменника есть программирующие люди, но они ориентируются в основном на пользователей Windows. На то есть веская причина, ведь согласно статистике линуксом пользуется всего один процент пользователей (согласно опросу). И это естественно, что под линукс программ нет. Точнее есть — скрипты некоторых пользователей, но они страшны как смертный грех. А тот один единственный достойный менеджер закачек и по совместительству плугин Фаерфокса (которым я не пользуюсь с тех пор как появился Хром) слишком «общий» и не достаточно хорошо заточен под местный сайт.

Я взялся написать качалку. Сперва пробовал написать её на Common Lisp, но у меня не получилось — я слишком плохо его знал. Потом попробовал написать её на Python. Получилось. Но программа была многопоточная, а питон и многопоточность — вещи не лучшим образом подходящие друг другу. Структура программы походила на многоэтажное здание, построенное из костылей.

Я бросил Python и взялся переписать программу на Clojure. Получилось не с первого раза, но процесс и результат меня порадовали: при всей многопоточности работа с состоянием так же проста, как в обычных, однопоточных императивных программах — происходит практически «не задумываясь», одним словом Clojure — отличный выбор для многопоточных сетевых приложений.

Если быть чуть более честным, то версия «Лейки», которую я здесь описываю является плодом многократных переписываний и всевозможных экспериментов над кодом, стилем и подходом к написанию программ на Clojure. Эту программу я переписывал около семи раз в течение года. Усилия, что я приложил к этому — хорошая цена за полученный мною опыт. В результате каждой такой переписи я осваивал тот или иной прием программирования. И изложенная ниже финальная версия программы представляет собой предельно идиоматичный Clojure-код который я сейчас способен написать.

В этой статье целиком и полностью, в стиле «литературного программирования», описана моя маленькая программа, многопоточный консольный менеджер закачек «Лейка». Я пользуюсь этой программой уже очень давно и скачал ею не одну сотню гигабайт, так что я надеюсь, эта программа послужит читателю если и не для практических целей, то хотябы для учебных. Но прежде чем перейти к деталям реализации я сделаю большое отступление и расскажу о тонкостях работы с состоянием в Clojure.

Работа с состоянием в Clojure

Clojure — особенный императивный язык. В отличие от подавляющего большинства языков программирования, в которых есть только один способ работы с состоянием — старые добрые переменные, в Clojure их как минимум четыре, из-за специфичного подхода программной транзакционной памяти (software transactional memory, STM).

Подход к состоянию в Clojure довольно прост — он основывается на четком разграничении значения, состояния и идентичности, которые во многих языках объединены в одну сущность.

Значение (value)
Нечто неизменяемое, или совокупность неизменяемых величин. Например число 3 — оно, как говорится, и в Африке 3 — в математическом, вневременном смысле; все тройки одинаковы, в том смысле, что существует только одна «тройка».

Идентичность (identity)
Сущность, которую мы ассоциируем с последовательностью состояний во времени. Даже если 2 идентичности имеют одинаковое значение (или одно и то же значение — тут никакой разницы), они не будут равны друг другу.

Состояние (state)
Значение идентичности в некоторый момент времени.

Идентичностями (т.е. изменяемыми объектами) в Clojure являются переменные (variables) и ссылки (references), всего их 4 вида; каждый вид используется для своих целей.

Переменные (variables)
могут изменяться только внутри одного единственного потока, поэтому они используются как глобальные переменные — к ним привязываются функции, макросы и просто значения; из-за динамической привязки отлично подходят для разного рода аспектно-ориентированного программирования.

Атомы (atoms)
это переменные, доступные для чтения и изменения из всех потоков; изменяются атомарно, по-отдельности, каждый атом в своей транзакции. Ипользуются как старые добрые переменные в обычных языках.

Ссылки (references)
как атомы, только изменяются внутри явно обозначенной программистом транзакции, поэтому используются для одновременного координированного группового изменения.

Агенты (agents)
это переменные, доступные изо всех потоков, для изменения которых нужно отправить им сообщение, состоящее из функции с аргументами. После отправки сообщения агенту программа продолжает свою работу; изменение состояния агента (вычисление функции сообщения) происходит в отдельном потоке, после чего агент принимает значение результата вычисления сообщения. Агенту можно послать сразу несколько сообщений, они сохранятся в очереди сообщений и будут обработаны последовательно. До тех пор, пока не будет вычислено сообщение, агент сохраняет свое прежнее значение.

Агенты — ключевые компоненты в этой программе, поэтому о них стоит рассказать подробнее.

Стиль программирования агентов Clojure в миру зовется asynchronous message-passing concurrency. Здесь asynchronous означает отсутствие необходимости дожидаться изменения агента после отправки ему сообщения.

Агенты используются как основные «рабочие лошадки». В отличие от ссылок и атомов, изменять которые можно только «чистыми» функциями (точнее, очень рекомендуется — из-за отката и повтора транзакций побочные эффекты могут причинить неприятности); сообщения агентов, как правило — функции с побочными эффектами.

В статье местами я использую терминологию слегка нетипичную для Clojure, например действие агента — это то же самое, что и функция-сообщение которое можно отправить агенту. Тело агента — то же самое что и состояние агента. Выполнение действия — обработка сообщения.

Агенту можно отправить сообщение двумя способами. Первый — для «быстрых» или процессороёмких, обычно без побочных эффектов, сообщений. Такие сообщения будут выполняться на ограниченном, в зависимости от количества ядер процессора, количестве потоков:

  (send agent function args)

Другой вариант для «долгоиграющих», обычно с обильным вводом-выводом, сообщений. Такие будут выполняться на большем количестве потоков, чем сообщения отправленные send'ом.

  (send-off agent function args)

Состояние агента можно узнать в любое время — для этого не надо ждать окончания обработки отправленного ему сообщения:

  (deref agent)
  или
  @agent

Если во время обработки сообщения возникает ошибка, т.е. выбрасывается исключение — оно сохраняется в агенте; ошибки агента можно увидеть вызвав функциию agent-errors. При этом агент становится недоступным для сообщений до тех пор, пока не будет очищен от ошибок функцией clear-agent-errors.

Во время обработки агентом сообщения внутри функции-сообщения становится доступной переменная *agent*, значением которой является агент, который обрабатывает сообщение. Таким образом агент может посылать сообщения самому себе. Также, не-нилевое значение переменной *agent* является признаком того, что код выполняется внутри действия агента.

Все сообщения которые агент отсылает себе или другим агентам во время выполнения действия задерживаются до окончания действия, что доставляет неудобства, так как в большинстве случаев бывает необходима мгновенная отправка. В подобных случаях используется функция release-pending-sends.

Дизайн конкурентной программы

В отличие от простых, последовательных программ, работающих в одном потоке, сконструировать конкурентную программу куда сложнее — из-за большого количества потенциальных взаимодействий между её частями, работающих в разных потоках (что, собственно и является смыслом слова «конкурентный»). Но если следовать следующим простым правилам, описанным в 5 главе «Message-Passing Concurrency» книги «Concepts, Techniques, and Models of Computer Programming» задача сильно упрощается.

  1. Неформальная спецификация. Первым делом нужно определить — что же программа должна делать?

  2. Компоненты. Необходимо перечислить все формы конкурентной активности — каждая из них становится компонентом (например, агентом). Далее следует нарисовать блочную диаграмму системы, в которой будут показаны все экземпляры компонентов.

  3. Протокол сообщений. Решить какие сообщения будут посылать компоненты и спроектировать протоколы соообщений между ними. Нарисовать диаграмму компонентов со всеми протоколами сообщений.

  4. Диаграммы состояний. Для каждого конкурентного компонента нужно нарисовать диаграмму состояний и проверить, что в каждом состоянии компонент получает и посылает правильные сообщения и выполняет правильные действия.

  5. Закодировать и распланировать. Закодировать систему на любимом языке программирования и выбрать любимый алгоритм планирования взаимодействий между компонентами.

  6. Протестировать и повторять до тех пор пока программа не станет работать так как от нее ожидается.

В общих чертах я следовал этой схеме, но картинок в статье я приводить не стану. В них нет особой надобности.

Неформальная спецификация

У нас есть текстовый файл наполненный ссылками на страницы разнообразных файлообменных ресурсов или прямыми ссылками непосредственно на файлы на этих ресурсах. Нам нужно скачать эти файлы с файлообменников, причем их можно качать в несколько потоков, в зависимости от возможностей, предоставляемых конкретными сервисами. По ходу скачивания могут возникнуть проблемы — на файлообменнике может не оказаться файла, загрузка может оборваться, на диске может быть недостаточно места. При обрыве связи загрузка должна возобновляться, если это возможно. Программа консольная и будет работать в пакетном режиме, при этом во время её работы в консоли будет отображаться полоса прогресса загрузки.

Скомпилированная в Jar программа будет запускаться так:

  java -jar leica.jar путь-к-файлу-с-ссылками [директория-куда-качать]

В командной строке указывается путь к файлу с ссылками и, необязательно, директория в которую будут скачиваться файлы. Эту информацию мы отразим в небольшой подсказке, которая будет выдаваться при запуске программы с ключом --help.

<help-message>= (U->)
"Leica -- downloader written in lisp.

Run:

java -jar leica.jar [keys] [file with links] [directory]"

Структура программы

Почти вся программа находится в одном файле. В этой статье описывается файл leica.clj. В репозитории есть еще один файл с модулем самодельного логгера log.clj. Признаюсь, мне не хватило терпения осилить ни один из трех монструозных джавовских логгеров, поэтому я написал свой собственный.

Clojure — язык настолько же функциональный, насколько и императивный, поэтому перед использованием имени в программе оно обязательно должно быть определено. Это ведет к тому, что программы на Clojure пишутся снизу-вверх. В этой статье я излагаю программу для лучшего её восприятия читателем, а не компьютером — и так и этак, я прыгаю с одного уровня на другой, перемешивая восходящий и нисходящий порядок изложения кода.

Вот вся программа, как она видна «с высоты птичьего полета». Программа разбита на несколько основных секций: сперва определяется пространство имен в котором находится весь этот код, затем все встречающиеся в программе имена, диспетчеры мультиметодов, мультиметоды, в блоке definitions определяются значения всех имен, в самом конце определена главная процедура -main.

<leica.clj>=
;;; -*- mode: clojure; coding: utf-8 -*-

;; Copyright (C) 2010 Roman Zaharov <zahardzhan@gmail.com>

;; This program is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

<namespace declaration>
(declare <names>)
<macros>
<multimethod dispatch functions>
<multimethods>
<definitions>
<main procedure>

В определении пространства имен указываются все используемые в коде модули и имена из них.

<namespace declaration>= (<-U)
(ns leica
  (:gen-class)
  (:use
   [log :only [info debug error]]
   [clojure.set :only [difference union]]
   [clojure.contrib.duck-streams :only [slurp* read-lines]]
   clojure.contrib.command-line
   clojure.contrib.def
   clojure.test
   hooks)

  (:require
   [clojure.contrib.io :as io])
  
  (:import
   (java.util Date)
   (java.net URLDecoder
             ConnectException)   
   (java.io File
            FileOutputStream
            InputStream)
   (org.apache.commons.httpclient URI
                                  HttpClient
                                  HttpStatus
                                  ConnectTimeoutException 
                                  NoHttpResponseException
                                  methods.GetMethod
                                  methods.HeadMethod
                                  params.HttpMethodParams
                                  util.EncodingUtil)))
Defines leica (links are to index).

Компоненты

В программе будет три взаимодействующих компонента, каждый из которых представляет собой некоторый вид конкурентной активности — многочисленные загрузки, планировщик загрузок и монитор прогресса загрузок. Все эти компоненты представлены агентами Clojure и работают одновременно, координируя свои действия между собой.

Загрузки это основные работники в этой программе. Эти агенты управляют всем процессом загрузки файла, начиная от получения ссылки со страницы с файлообменника и заканчивая собственно скачиванием файла. Загрузки могут быть высокоуровневыми и по ходу своей работы создавать и управлять более низкоуровневыми загрузками. Загрузки создаются конструктором make-download. Все наличествующие в программе загрузки хранятся в глобальной переменной downloads* — это ссылка на множество всех загрузок.

Загрузки автономны, каждая из них имеет собственную программу и сама управляет своими действиями, поэтому внешний интерфейс к ним весьма прост; единственное, что можно сделать с загрузкой — отправить ей сообщение запуска run и тогда загрузка, в зависимости от ситуации в которой она находится, выполнит действие предписанное ей её программой.

Планировщик загрузок это агент который координирует совместную работу загрузок друг с другом: он составляет план, запускает загрузки и следит за тем чтобы между ними не возникали конфликты из-за ресурсов. Планировщик представлен глобальной переменной download-scheduler*.

Планировщик фактически является реактивным агентом, т.е. без внутреннего состояния — он работает с глобальным состоянием всей программы). Ему можно послать сообщение schedule-downloads в ходе обработки которого он определит загрузки которые нужно запустить, а затем пошлет им сообщение запуска run и сообщение, предписывающее запустить планировщик после завершения выполнения действия загрузкой. Тем самым планировщик и загрузки попеременно запускают друг друга — этот цикл сообщений и является основным механизмом программы; когда он завершается — завершается и работа программы.

Монитор прогресса это агент который отвечает за отображение информации о работе программы. Он отрисовывает полосу прогресса в консоли. Монитор представлен глобальной переменной progress-monitor*.

Монитор принимает сообщения от загрузок о начале слежения за прогрессом загрузки begin-monitor-progress, об отрисовке в консоли прогресса загрузки update-progress и о прекращении слежения за прогрессом загрузки cease-monitor-progress.

Главная процедура

Главная процедура обрабатывает аргументы командной строки. Выбирается читабельный файл и из него считываются строки. Выбирается директория в которую есть доступ на запись. Если в наличии нет файла или директории — программ завершается функцией exit-program. Затем из ссылок в считанных из файла строках создаются загрузки и запускается планировщик загрузок, который завершает работу программы по окончании планирования.

<main procedure>= (<-U)
(defn -main [& args]
(with-command-line args
 <help-message>
 [remaining-args]

    (let [lines-with-links
          (read-lines (some #(as-file % :readable true :directory false) remaining-args))

          workpath
          (or (some #(as-file % :writeable true :directory true) remaining-args)
              (as-file (System/getProperty "user.dir") :writeable true :directory true))]

      (when-not lines-with-links
        (info "You must specify file with links to download.")
        (exit-program))

      (when-not workpath
        (info "You must specify directory in which files will be downloaded.")
        (exit-program))

      (doseq [line lines-with-links]
        (make-download line :path workpath))

      (send-off download-scheduler* assoc :when-done exit-program)
      (send-off download-scheduler* schedule-downloads))))
Defines -main (links are to index).

<names>= (<-U) [D->]
exit-program

<definitions>= (<-U) [D->]
(defn exit-program []
  (debug "Leica is done. Bye.")
  (System/exit 0))
Defines exit-program (links are to index).

Функция as-file это своеобразный швейцарский нож для работы с файлами который я использую в основном для выяснения, существует ли некий файл с определенными характеристиками. Например (as-file path :directory true :writeable true) возвращает файл path если это файл директории доступной для записи, в противном случае он возвращает nil.

Функция написана в «maybe-монадообразном стиле» характерном для кода на Хаскелле — аргумент с помощью макроса-комбинатора прогоняется через множество maybe-функций, и если на некотором шаге одна из функций возвращает nil, то и в конце возвращается nil.

<names>+= (<-U) [<-D->]
as-file

<definitions>+= (<-U) [<-D->]
(defn as-file
  [arg & {:as args :keys [exists create readable writeable directory]}]
  (let [argtype (type arg)
        maybe-create
        (fn [f] 
          (when f 
            (cond (and (= create true) (not (.exists f)))
                  (let [dir (File. (.getParent f))]
                    (if-not (.exists dir)
                      (throw (new Exception
                                  "Cannot create file in nonexistant directory."))
                      (if-not (.canWrite dir)
                        (throw (new Exception
                                    "Cannot create file in nonwriteable directory."))
                        (do (.createNewFile f) f))))
                  :else f)))
        maybe-exists
        (fn [f]
          (when f
            (cond (= exists true) (when (.exists f) f)
                  (= exists false) (when-not (.exists f) f)
                  (not exists) f)))
        maybe-directory
        (fn [f]
          (when f
            (cond (= directory true) (when (.isDirectory f) f)
                  (= directory false) (when-not (.isDirectory f) f)
                  (not directory) f)))
        maybe-readable
        (fn [f]
          (when f
            (cond (= readable true) (when (.canRead f) f)
                  (= readable false) (when-not (.canRead f) f)
                  (not readable) f)))
        maybe-writeable
        (fn [f]
          (when f
            (cond (= writeable true) (when (.canWrite f) f)
                  (= writeable false) (when-not (.canWrite f) f)
                  (not writeable) f)))]

    (cond (= argtype File)
          (-> arg maybe-create maybe-exists maybe-directory maybe-readable maybe-writeable)

          (= argtype String)
          (if args
            (apply as-file (new File arg) (flatten (seq args)))
            (as-file (new File arg))))))
Defines as-file (links are to index).

Загрузка

Загрузка (download) является агентом и создается из соответствующего прототипа загрузки с помощью конструктора make-download из строки line содержащей ссылку на файлообменный ресурс. Конструктор make-download в качестве опциональных ключей принимает программу загрузки :program, каталог в который должны скачиваться файлы :path и имя загрузки :name. Каждой новой загрузке автоматически присваивается её порядковый номер :precedence и она добавляется ко множеству уже созданных загрузок downloads* чтобы планировщик загрузок мог обеспечить их корректную работу друг с другом.

Тело загрузки является хэшем (здесь и далее термин «хэш» обозначает базовую для Clojure структуру данных hash-map) который представляет текущее состояние агента загрузки. Разные загрузки создаются из разных прототипов и с разными телами — в зависимости от того какие функции они выполняют.

Тип загрузки это метка хранящаяся в ключе :type метаданных тела загрузки. Свой тип (и соответственно прототип) загрузки определяются для каждого отдельного сетевого файлообменного сервиса с которым работают загрузки.

Программа загрузки это функция, которая определяет каждое следующее действие загрузочного агента, она полностью контролирует его поведение; на вход она принимает тело загрузки, на выходе выдает функцию-действие, которую должен выполнить загрузочный агент.

Действие загрузочного агента это функция которая применяется к телу агента, она выполняет некоторую полезную работу и изменяет состояние агента.

Возникающие во время выполнения действия ошибки, непредусмотренные или возникающие по воле программиста, должны приводить к выбрасыванию исключения.

Большинство загрузочных агентов содержат следующие ключи в своих телах:

:link
это самая важная часть загрузочного агента — ссылка которую загрузка будет обрабатывать и ради обработки которой она создавалась. Например загрузки которые обрабатывают прямые ссылки на файл просто загружают этот файл, в процессе следя за тем, чтобы не кончилось место на диске или не было дубликатов. Или если загрузка создана для обработки ссылок на пользовательские страницы файлообменника, то она как правило занимается тем, что вырезает прямые ссылки из страницы, а затем создает и запускает новые загрузки для каждой прямой ссылки.

:link-pattern
это неотемлемая часть всех прототипов загрузок и используется только в момент создания новой загрузки из прототипа. Содержит регулярное выражение, с которым сравнивается строка line, переданная в конструктор загрузки make-download.

:name
это имя загрузки. Обычно это имя загружаемого файла или заголовок страницы которую парсит высокоуровневая загрузка при получении прямых ссылок.

:path
это директория в которую загружаются файлы.

:program
это программа загрузочного агента.

:precedence
это порядковый номер загрузки.

:alive
определяет жива ли загрузка. Если загрузка мертва, то она не может выполнить никакое действие. Для проверки используется предикаты alive? и dead? соответственно.

:failed
говорит о том, возникла ли ошибка во время выполнения прошлого действия (т.е. было ли брошено исключение во время выполнения действия). После каждого удачного действия сбрасывается на false. Для проверки используется предикат failed?.

:fail-reason
это причина провала действия агента; содержит исключение которое было брошено во время выполнения последнего действия. Селектор — fail-reason. Сбрасывается на nil после каждого удачного действия.

:run-atom
принимает истинное значение, если в данный момент загрузочный агент выполняет некоторое действие. Для проверки используется предикат running?.

:stop-atom
принимает истинное значение, если загрузочный агент должен быть остановлен. В данный момент в программе никак не используется.

:file-length-atom
содержит длину в байтах уже (частично) загруженного на компьютер файла, эта величина изменяется во время действия загрузочного агента, поэтому для хранения используется атом.

:child-link
используется в высокоуровневых загрузках для хранения ссылки из которой создется дочерняя загрузка.

:child
используется в высокоуровневых загрузках для хранения ссылки на дочерние загрузки.

:max-running-downloads
определяет максимально допустимое количество запущенных (т.е. одновременно выполняющих некое действие) загрузок данного типа (т.е. работающих с определенным сервисом). Используется в основном планировщиком загрузок и указывается в прототипе загрузки, если это ограничение имеет смысл для сетевого сервиса.

<names>+= (<-U) [<-D->]
make-download downloads-precedence-counter

<definitions>+= (<-U) [<-D->]
(def downloads-precedence-counter (atom 0))

(defn make-download [line & {:as opts :keys [program path name]}]
  (when-let [download-prototype (download-prototype-matching-address line)]
    (let-return
     [dload (agent
             (merge
              download-prototype
              {:precedence (swap! downloads-precedence-counter inc)
               :alive true
               :failed false
               :fail-reason nil
               :run-atom (atom false)
               :stop-atom (atom false)}
              (dissoc opts :program :path :name)
              (when (supplied program)
                (if-not (ifn? program)
                  (throw (Exception. "Program must be invokable."))
                  {:program program}))
              (when (supplied path)
                (if-let [valid-path (as-file path :directory true :writeable true)]
                  {:path valid-path}
                  (throw (Exception. "Path must be writeable directory."))))
              (when (supplied name)
                (if-not (string? name)
                  (throw (Exception. "Name must be string."))
                  {:name name}))))]
     (add-to-downloads dload))))
Defines downloads-precedence-counter, make-download (links are to index).

Здесь используются функции для добавления и удаления загрузок из множества всех загрузок downloads*.

<names>+= (<-U) [<-D->]
downloads* add-to-downloads remove-from-downloads

<definitions>+= (<-U) [<-D->]
(def downloads* (ref #{}))

(defn add-to-downloads [dload]
  (dosync (alter downloads* union (hash-set dload))))

(defn remove-from-downloads [dload]
  (dosync (alter downloads* difference (hash-set dload))))
Defines add-to-downloads, downloads*, remove-from-downloads (links are to index).

Вспомогательные макросы with-return и let-return я очень часто использую для явного указания значения, которое возвращает блок кода, потому как большинство функций агентов обязаны что-то возвращать, при этом производя некоторые побочные эффекты.

<macros>= (<-U) [D->]
(defmacro with-return [expr & body]
  `(do (do ~@body)
       ~expr))

(defmacro let-return [[form val] & body]
  `(let [~form ~val]
     (with-return ~form
       (do ~@body))))
Defines let-return, with-return (links are to index).

Макрос supplied это просто переименованный and, который я использую для проверки переданных в функцию опциональных ключей-аргументов.

<macros>+= (<-U) [<-D->]
(defalias supplied and)
Defines supplied (links are to index).

Прототип загрузки

Прототип загрузки это обычный хэш, он служит заготовкой из которой затем собирается тело загрузочного агента. Прототипы хранятся в глобальной переменной download-prototypes* — это атом с хэшем в котором ключи — метки типов прототипов, а значения — сами прототипы загрузок.

Прототип загрузки определяется макросом def-download-prototype, этот макрос создают переменную, значением которой является хэш с меткой типа унаследованной от метки ::download в ключе :type в метаданных. Диспетчеризация по типу широко используется в мультиметодах селекторов и действиях загрузок.

<names>+= (<-U) [<-D->]
download-prototypes*

<definitions>+= (<-U) [<-D->]
(def download-prototypes* (atom {}))
Defines download-prototypes* (links are to index).

<macros>+= (<-U) [<-D->]
(defmacro def-download-prototype [name body]
  `(let [name-keyword# (keyword (str *ns*) (str (quote ~name)))]
     (def ~name (with-meta ~body {:type name-keyword#}))
     (derive name-keyword# ::download)
     (swap! download-prototypes* assoc name-keyword# ~name)
     ~name))
Defines def-download-prototype (links are to index).

В качестве исходных данных в лейку передается список ссылок. Причем ссылки с определенными адресами обрабатываются загрузками определенного типа. Для получения прототипа загрузки, которая должна обрабатывать данную ссылку используется функция download-prototype-matching-address — она выбирает нужный прототип сравнивая ссылку с регулярным выражением в ключе :link-pattern прототипа.

<names>+= (<-U) [<-D->]
download-prototype-matching-address

<definitions>+= (<-U) [<-D->]
(defn download-prototype-matching-address [line]
  (when-let [url (extract-url line)]
    (first (for [[download-type download-prototype] @download-prototypes*
                 :when (:link-pattern download-prototype)
                 :let [link (re-find (:link-pattern download-prototype) url)]
                 :when link]
             (assoc download-prototype :link link)))))
Defines download-prototype-matching-address (links are to index).

Здесь функция extract-url вырезает из произвольной строки первый попавшийся URL-адрес.

<names>+= (<-U) [<-D->]
extract-url

<definitions>+= (<-U) [<-D->]
(defn extract-url [line]
  (first (re-find #"((https?|ftp|file):((//)|(\\\\))+[\w\d:#@%/;$()~_?\+-=\\\.&]*)"
                  line)))
Defines extract-url (links are to index).

Запуск загрузки

Запуск осуществляет метод run. Во избежание запуска тела загрузки при работающем агенте или запуска мертвой или остановленной загрузки перед запуском осуществляются соответствующие проверки. На время работы агента атом run-atom в теле загрузки устанавливается в значение true. Действие action которое должна выполнить загрузка определяется её программой program или задается явно опциональным аргументом :action. Если во время выполнения действия исключений не возникло — загрузка переходит в бездействующее состояние, в противном случае исключение ловится и сохраняется в теле загрузки.

<multimethods>= (<-U) [D->]
(defmulti run type-dispatch)
Defines run (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod run ::download
  [{:as dload :keys [program run-atom stop-atom failed]} & {action :action}]

  (when (and (not *agent*) (running? dload))
    (throw (Exception. "Cannot run download while it is running.")))
  
  (debug "Run download " (represent dload))

  (if (or (dead? dload) (stopped? dload)) dload
      (try (reset! run-atom true)
           (let [action (or action (program dload))]
             (idle (action dload)))
           ;; (catch Error RuntimeException ...)
           (catch Throwable e (fail dload :reason e))
           (finally (reset! run-atom false)))))

Предикат running? проверяет — запущен ли агент?

<multimethods>+= (<-U) [<-D->]
(defmulti running? type-dispatch)
Defines running? (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod running? ::download [dload]
  (deref (:run-atom dload)))

Остановка загрузки

В этой версии программы функция остановки загрузки не используется.

<multimethods>+= (<-U) [<-D->]
(defmulti stop type-dispatch)
Defines stop (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod stop ::download [dload])

Предикат stopped? проверяет — не остановлен ли агент?

<multimethods>+= (<-U) [<-D->]
(defmulti stopped? type-dispatch)
Defines stopped? (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod stopped? ::download [dload]
  (deref (:stop-atom dload)))

Базовые действия и предикаты загрузок

Практически все функции для работы с загрузками являются мультиметодами и диспетчеризуются по типу тела агента функцией type-dispatch.

<names>+= (<-U) [<-D->]
type-dispatch

<multimethod dispatch functions>= (<-U)
(defn type-dispatch
  ([x] (type x))
  ([x & xs] (type x)))
Defines type-dispatch (links are to index).

Действие idle переводит агент в бездействующее состояние.

<multimethods>+= (<-U) [<-D->]
(defmulti idle type-dispatch)
Defines idle (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod idle ::download [dload]
  (assoc dload :failed false :fail-reason nil))

Предикат idle? проверяет — находится ли агент в бездействующем состоянии (это значит, что агент жив, не запущен, не остановлен и не ошибся выполняя предыдущее действие)?

<multimethods>+= (<-U) [<-D->]
(defmulti idle? type-dispatch)
Defines idle? (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod idle? ::download [dload]
  (and (alive? dload)
       (not (running? dload))
       (not (stopped? dload))
       (not (failed? dload))))

Предикат alive? проверяет — жив ли агент?

<multimethods>+= (<-U) [<-D->]
(defmulti alive? type-dispatch)
Defines alive? (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod alive? ::download [dload]
  (:alive dload))

Предикат dead? проверяет — мертв ли агент?

<multimethods>+= (<-U) [<-D->]
(defmulti dead? type-dispatch)
Defines dead? (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod dead? ::download [dload]
  (not (alive? dload)))

Действие fail используется для перевода агента в «ошибочное» состояние. Оно пишет в лог об ошибке и сохраняет возникшее исключение в теле агента.

<multimethods>+= (<-U) [<-D->]
(defmulti fail type-dispatch)
Defines fail (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod fail ::download [dload & {reason :reason}]
  (error "Error " (represent dload) " reason: " reason)
  (assoc dload :failed true :fail-reason reason))

Предикат failed? проверяет — возникла ли ошибка во время выполнения предыдущего действия?

<multimethods>+= (<-U) [<-D->]
(defmulti failed? type-dispatch)
Defines failed? (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod failed? ::download [dload]
  (:failed dload))

Селектор fail-reason возвращает исключение которое было брошено при возникновении ошибки в предыдущем действии.

<multimethods>+= (<-U) [<-D->]
(defmulti fail-reason type-dispatch)
Defines fail-reason (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod fail-reason ::download [dload]
  (:fail-reason dload))

Действие die убивает агент.

<multimethods>+= (<-U) [<-D->]
(defmulti die type-dispatch)
Defines die (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod die ::download [dload]
  (assoc dload :alive false))

Функция represent наглядно представляет агент в виде строки.

<multimethods>+= (<-U) [<-D->]
(defmulti represent type-dispatch)
Defines represent (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod represent ::download [{:as dload :keys [name link]}]
  (str "<Download " (apply str (take 30 (or name link))) \>))

Функция performance вычисляет характеристики производительности агента, например, процент загруженного файла, скорость загрузки и прочее.

<multimethods>+= (<-U) [<-D->]
(defmulti performance type-dispatch)
Defines performance (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod performance ::download
  [{:as dload :keys [total-file-length]}]
  (let [file-length (file-length dload)
        load-percent (when (and (number? total-file-length) (number? file-length)
                                (pos? total-file-length) (pos? file-length))
                       (int (Math/floor (* 100 (/ file-length total-file-length)))))]
    (merge {} (when load-percent {:load-percent load-percent}))))

Селектор file-length возвращает длину в байтах уже скачанного на компьютер файла, если такой ключ :file-length-atom имеет место быть в теле агента.

<multimethods>+= (<-U) [<-D]
(defmulti file-length type-dispatch)
Defines file-length (links are to index).

<definitions>+= (<-U) [<-D->]
(defmethod file-length ::download
  [{:as dload file-length-atom :file-length-atom}]
  (when file-length-atom
    (deref file-length-atom)))

Планировщик загрузок

Планировщик загрузок это агент который обеспечивает совместную работу загрузок друг с другом.

Планировщик представлен глобальной переменной download-scheduler* в которой хранится агент, тело которого — хэш с набором ключей определяющих алгоритм работы планировщика. Большая часть этих ключей используется для отладки планировщика и загрузок. Среди них

:active
включает о отключает планировщик. Ключ используется для отладки программы.

:schedule-with-callback
включает и отключает «callback» — отправку загрузкам специального сообщения callback-download-scheduler для вызова планировщика после завершения выполнения загрузкой действия. Ключ используется для отладки программы.

:last-scheduled
это список загрузок которые были запущены последними. Изменяется после каждого запуска планировщика.

:when-done
этот ключ устанавливается перед запуском планировщика и содержит функцию которая вызывается если нет никаких загрузок которые планировщик мог бы запустить в будущем. Обычно это функция завершения работы программы.

<names>+= (<-U) [<-D->]
download-scheduler*

<definitions>+= (<-U) [<-D->]
(def download-scheduler*
     (agent {:active true
             :when-done nil ;; run when there are no more scheduling job
             :schedule-with-callback true
             :last-scheduled ()}))
Defines download-scheduler* (links are to index).

Основная задача планировщика — определить, с учетом всевозможных ресурсных ограничений, какие загрузки нужно запустить; а затем отправить им сообщение запуска run и сообщение callback-download-scheduler, предписывающее загрузке вызвать планировщик после завершения её работы. Все это осуществляет единственная функция schedule-downloads. Опциональный аргумент callback используется агентом который запускает планировщик для указания себя.

<names>+= (<-U) [<-D->]
schedule-downloads

<definitions>+= (<-U) [<-D->]
(defn schedule-downloads 
  [{:as scheduler :keys [active when-done schedule-with-callback last-scheduled]}
   & {:keys [callback]}]

  {:pre [(when-supplied callback (agent? callback))]}

  (cond
   <when scheduler is deactivated then do not schedule>
   <when there are nothing to shcedule ever then leave>   

   :otherwise
   (let [successors <list of downloads to launch>]
     <launch every download successor>)))
Defines schedule-downloads (links are to index).

  1. Сперва планировщик проверяет, активен ли он; если нет, то на этом его работа завершается.

    <when scheduler is deactivated then do not schedule>= (<-U)
    (not active) ;; then
    (assoc scheduler :last-scheduled ())
    

  2. Если нет ни одной загрузки которую когда-нибудь можно будет запустить — все они мертвы или множество загрузок downloads* пусто, то следует завершить работу планировщика и вызвать функцию when-done, если она есть.

    <when there are nothing to shcedule ever then leave>= (<-U)
    (or (not (seq @downloads*))
        (every?  (comp dead? deref) @downloads*))
    (with-return (assoc scheduler :last-scheduled ())
      (when when-done (when-done)))
    

  3. Составляется список загрузок для запуска — successors. Для этого множество всех загрузок downloads* разбивается на группы по типу; каждая группа загрузок сортируется в порядке появления каждой из загрузок в программе; все это сохраняется в хэше groups. Для каждой группы в groups определяются загрузки которые будут запущены; затем все они объединяются в один список.

    <list of downloads to launch>= (<-U)
    (let [groups
          (into {} (for [[dloads-type dloads] (group-by (comp type deref) @downloads*)]
                     {dloads-type (sort-by (comp :precedence deref) dloads)}))]
    (flatten
    (for [[dloads-type dloads] groups
          :let [max-running-dloads ;; maximum amount of running
          ;; downloads for this type of downloads
          (:max-running-downloads (dloads-type @download-prototypes*))
          running-dloads (filter (comp running? deref) dloads)
          count-of-running-dloads (count running-dloads)
          idle-dloads (filter (comp idle? deref) dloads)
          failed-dloads (filter (comp failed? deref) dloads)
          count-of-dloads-to-launch (- max-running-dloads count-of-running-dloads)]]
      (cond
        ;; if no downloads in group then leave
        (not (seq dloads)) ()
    
        ;; if all downloads in group is dead then leave
        (every? (comp dead? deref) dloads) ()
    
        ;; if there are more running downloads then it might be for
        ;; that group then leave
        (<= count-of-dloads-to-launch 0) ()
    
        ;; schedule downloads from scratch
        (not callback)
        (take count-of-dloads-to-launch (concat idle-dloads failed-dloads))
    
        ;; some download ask to schedule downloads
        callback
        (take count-of-dloads-to-launch
              (concat idle-dloads (take-entirely-after callback failed-dloads)))))))
    

  4. Каждой загрузке из списка на запуск successors отсылается сообщение запуска run и в зависимости от значения ключа schedule-with-callback, сообщение для запуска планировщика после того как загрузка выполнит действие.

    <launch every download successor>= (<-U)
    (with-return (assoc scheduler :last-scheduled successors)
      (doseq [successor successors]
        (send-off successor run)
        (when schedule-with-callback
          (send-off successor callback-download-scheduler))))
    

* Функция callback-download-scheduler запускает планировщик загрузок «от лица» загрузочного агента.

<names>+= (<-U) [<-D->]
callback-download-scheduler

<definitions>+= (<-U) [<-D->]
(defn callback-download-scheduler [dload]
  (with-return dload
    (when *agent*
      (send-off download-scheduler* schedule-downloads :callback *agent*))))
Defines callback-download-scheduler (links are to index).

Макрос when-supplied проверяет опциональные аргументы, если они заданы.

<macros>+= (<-U) [<-D->]
(defmacro when-supplied [& clauses]
  (if-not clauses true
          `(and (or (nil? ~(first clauses))
                    (do ~(second clauses)))
                (when-supplied ~@(next (next clauses))))))
Defines when-supplied (links are to index).

Предикат agent? проверяет — является ли его аргумент агентом?

<names>+= (<-U) [<-D->]
agent?

<definitions>+= (<-U) [<-D->]
(defn agent? [x]
  (instance? clojure.lang.Agent x))
Defines agent? (links are to index).

Функции семейства take- выбирают элементы последовательности перед заданным элементом — take-before, после заданного элемента — take-after и все элементы после и перед элементом включая и сам элемент — take-entirely-after (наивная реализация).

<names>+= (<-U) [<-D->]
take-after take-before take-entirely-after

<definitions>+= (<-U) [<-D->]
(defn take-after [item coll]
  (rest (drop-while (partial not= item) coll)))

(defn take-before [item coll]
  (take-while (partial not= item) coll))

(defn take-entirely-after [item coll]
  (let [after (take-after item coll)
        before (take-before item coll)]
    (concat after before
            (when-not (= (count before) (count coll))
              (list item)))))
Defines take-after, take-before, take-entirely-after (links are to index).

Монитор прогресса

Монитор это агент который наглядно отображает ход работы программы. Он определен в глобальной переменной progress-monitor*. Тело агента представлено хэшем с расчетом на будущее расширение; оно содержит единственный ключ :agents, его значением является множество агентов которые в данный момент отображаются в полоске прогресса в консоли. Для добавления и удаления агентов из этого множества используются функции begin-monitor-progress и cease-monitor-progress.

<names>+= (<-U) [<-D->]
progress-monitor*

<definitions>+= (<-U) [<-D->]
(def progress-monitor* (agent {:agents #{}}))
Defines progress-monitor* (links are to index).

<names>+= (<-U) [<-D->]
begin-monitor-progress cease-monitor-progress

<definitions>+= (<-U) [<-D->]
(defn begin-monitor-progress [{:as progress-monitor agents :agents} agnt]
  {:pre (agent? agnt)}
  (assoc progress-monitor :agents (union agents (hash-set agnt))))

(defn cease-monitor-progress [{:as progress-monitor agents :agents} agnt]
  {:pre (agent? agnt)}
  (.print System/out (str "\r" <eighty-spaces> "\r"))
  (assoc progress-monitor :agents (difference agents (hash-set agnt))))
Defines begin-monitor-progress, cease-monitor-progress (links are to index).

eighty-spaces это просто 80 пробелов в ширину консоли.

<eighty-spaces>= (<-U)
"                                                                                "

Включение и отключение отображения прогресса загрузочного агента обычно происходит в одном его действии, поэтому имеет смысл использовать для этого макрос with-progress-monitoring который об этом позаботится.

<macros>+= (<-U) [<-D]
(defmacro with-progress-monitoring [agnt & body]
  `(let [agnt?# (agent? ~agnt)]
     (try (when agnt?# (send-off progress-monitor* begin-monitor-progress ~agnt))
          (do ~@body)
          (finally (when agnt?# (send-off progress-monitor* cease-monitor-progress ~agnt))))))
Defines with-progress-monitoring (links are to index).

Для отрисовки полосы прогресса в консоли нужно отослать монитору прогресса progress-monitor* сообщение show-progress. Отправка этого сообщения во время выполнения загрузочным агентом действия должна сопровождаться вызовом функции release-pending-sends из-за того что агенты в Clojure задерживают все отправляемые во время действия сообщения до завершения действия.

<names>+= (<-U) [<-D->]
show-progress

<definitions>+= (<-U) [<-D->]
(defn show-progress [{:as progress-monitor agents :agents}]
  (with-return progress-monitor
    (.print System/out \return)
    (doseq [abody (map deref agents)
            :let [name (:name abody)
                  name-length (if (string? name) (count name) nil)
                  perf (performance abody)
                  load-percent (:load-percent perf)]]
      (.print System/out (str \[ (cond (not name) \-
                                       (< name-length 12) name
                                       :longer (str (.substring name 0 5) \. \.
                                                    (.substring name (- name-length 7) name-length)))
                              \space (or load-percent \0) \% \])))
    (.print System/out \return)))
Defines show-progress (links are to index).

Прототипы загрузок для конкретных сервисов

Ниже идут определения прототипов и программ загрузок для скачивания файлов с конкретных адресов и файлообменников.

Сетевой код довольно уродливый. Я использую родную для Java библиотеку Apache HTTP Client за неимением лучшего. Все родные для Clojure HTTP-клиенты гораздо менее качественны.

В большинстве HTTP-запросов я использую заранее определенные величины таймаутов соединений и размера буфера.

<names>+= (<-U) [<-D->]
timeout-after-fail* connection-timeout* get-request-timeout* head-request-timeout* buffer-size*

<definitions>+= (<-U) [<-D->]
(def timeout-after-fail* 3000)
(def connection-timeout* 15000)
(def get-request-timeout* 30000)
(def head-request-timeout* 30000)
(def buffer-size* 65536)
Defines buffer-size*, connection-timeout*, get-request-timeout*, head-request-timeout*, timeout-after-fail* (links are to index).

files*.dsv.*.data.cod.ru

Прототипы загрузок файлов по прямым ссылкам с местного файлообменника. С каждого из адресов можно качать в один поток.

Алгоритм программы загрузки очень прост:

  1. Функция files*-dsv-*-data-cod-ru-get-head отправляет на файлообменник запрос HEAD, из него узнается размер и имя файла.

  2. Функция get-local-file выбирает локальный файл в который она будет загружать файл с сервера.

  3. Если файл уже полностью загружен или на диске для него нет места — загрузка умирает, если все в порядке — функция begin-download начинает загрузку.

<names>+= (<-U) [<-D->]
files*-dsv-*-data-cod-ru-download-program

<definitions>+= (<-U) [<-D->]
(defn files*-dsv-*-data-cod-ru-download-program
  [{:as dload :keys [link name file path total-file-length]}]
  (cond (not link) die
        (not (and name total-file-length)) files*-dsv-*-data-cod-ru-get-head
        (not file) get-local-file
        (or (out-of-space-on-path? dload) (fully-loaded? dload)) die
        :requirements-ok begin-download))
Defines files*-dsv-*-data-cod-ru-download-program (links are to index).

Функция files*-dsv-*-data-cod-ru-get-head отправляет на файлообменник запрос HEAD, из него узнается размер и имя файла.

<names>+= (<-U) [<-D->]
files*-dsv-*-data-cod-ru-get-head

<definitions>+= (<-U) [<-D->]
(defn files*-dsv-*-data-cod-ru-get-head [{:as dload :keys [link name]}]
  {:pre [(supplied link)]}
  (let [client (new HttpClient)
        head (new HeadMethod link)]
    (.. client getHttpConnectionManager getParams 
        (setConnectionTimeout connection-timeout*))
    (.. head getParams (setSoTimeout head-request-timeout*))
    (try (let [status (.executeMethod client head)]
           (if (= status HttpStatus/SC_OK)
             (let [content-length (.. head (getResponseHeader "Content-Length"))
                   content-disposition (.. head (getResponseHeader "Content-Disposition"))]
               (if-not (or content-length content-disposition) (die dload)
                 (let [length (Integer/parseInt (.getValue content-length))
                       filename (URLDecoder/decode (second (re-find #"; filename=\"(.*)\"" (.getValue content-disposition))))]
                   (if-not (and length filename) (die dload)
                     (assoc dload
                       :total-file-length length
                       :file-length-atom (atom 0)
                       :name (or name filename))))))
             (throw (Exception. "HEAD request failed."))))
         (finally (.releaseConnection head)))))
Defines files*-dsv-*-data-cod-ru-get-head (links are to index).

Функция get-local-file выбирает локальный файл в который она будет загружать файл с сервера.

<names>+= (<-U) [<-D->]
get-local-file

<definitions>+= (<-U) [<-D->]
(defn get-local-file [{:as dload :keys [name path]}]
  {:pre [(supplied name path)]}
  (assoc dload :file (new File path name)))
Defines get-local-file (links are to index).

Предикат out-of-space-on-path? проверяет — хватает ли места на локальном диске для загрузки файла?

<names>+= (<-U) [<-D->]
out-of-space-on-path?

<definitions>+= (<-U) [<-D->]
(defn out-of-space-on-path? [{:as dload :keys [path file total-file-length]}]
  {:pre [(supplied path file total-file-length)]}
  (if (.exists file)
    (< (.getUsableSpace path) (- total-file-length (.length file)))
    (= (.getUsableSpace path) 0)))
Defines out-of-space-on-path? (links are to index).

Предикат fully-loaded? проверяет — загружен ли файл полностью?

<names>+= (<-U) [<-D->]
fully-loaded?

<definitions>+= (<-U) [<-D->]
(defn fully-loaded? [{:as dload :keys [file total-file-length]}]
  {:pre [(supplied file total-file-length)]}
  (boolean (and (.exists file) (<= total-file-length (.length file)))))
Defines fully-loaded? (links are to index).

Функция begin-download скачивает файл.

<names>+= (<-U) [<-D->]
begin-download

<definitions>+= (<-U) [<-D->]
(defn begin-download
  [{:as dload :keys [name link file total-file-length file-length-atom]}]
  {:pre [(supplied name link file total-file-length)]}
  (let [client (HttpClient.)
        get (GetMethod. link)]
    (try
      (.. client getHttpConnectionManager getParams 
          (setConnectionTimeout connection-timeout*))
      (.. get getParams (setSoTimeout get-request-timeout*))
      (.setRequestHeader get "Range" (str "bytes=" (actual-file-length file) \-))
      (.executeMethod client get)
      (let [content-length (.getResponseContentLength get)]
        (cond (not content-length)
              (throw (Exception. "Cannot check file length before download."))

              (not= content-length (- total-file-length (actual-file-length file)))
              (throw (Exception. "Downloading file size mismatch."))

              :else
              (with-return dload
                (with-open [#^InputStream input (.getResponseBodyAsStream get)
                            #^FileOutputStream output (FileOutputStream. file true)]
                  (info "Begin download " name)
                  (with-progress-monitoring *agent*
                    (let [buffer (make-array Byte/TYPE buffer-size*)]
                      (loop [file-size (actual-file-length file)]
                        (let [read-size (.read input buffer)]
                          (when (pos? read-size)
                            (let [new-size (+ file-size read-size)]
                              (.write output buffer 0 read-size)
                              (reset! file-length-atom new-size)
                              (when *agent*
                                (send-off progress-monitor* show-progress)
                                (release-pending-sends))
                              (when-not (stopped? dload)
                                (recur new-size))))))))
                  (.flush output)
                  (info "End download " name)))))
      (finally (.releaseConnection get)))))
Defines begin-download (links are to index).

Функция actual-file-length просто возвращает размер файла в байтах если он есть и 0 — если его нет.

<names>+= (<-U) [<-D->]
actual-file-length

<definitions>+= (<-U) [<-D->]
(defn actual-file-length [file]
  (if (.exists file) (.length file) 0))
Defines actual-file-length (links are to index).

Прототипы загрузок.

<names>+= (<-U) [<-D->]
files*-dsv-*-data-cod-ru files3?-dsv-*-data-cod-ru files2-dsv-*-data-cod-ru
files3?-dsv-region-data-cod-ru files2-dsv-region-data-cod-ru

<definitions>+= (<-U) [<-D->]
(def files*-dsv-*-data-cod-ru
     {:program files*-dsv-*-data-cod-ru-download-program
      :max-running-downloads 1
      :link nil
      :name nil
      :path nil
      :file nil
      :total-file-length nil})

(def-download-prototype files3?-dsv-*-data-cod-ru
  (assoc files*-dsv-*-data-cod-ru :link-pattern #"http://files3?.dsv.*.data.cod.ru/.+"))

(def-download-prototype files2-dsv-*-data-cod-ru
  (assoc files*-dsv-*-data-cod-ru :link-pattern #"http://files2.dsv.*.data.cod.ru/.+"))

(def-download-prototype files3?-dsv-region-data-cod-ru
  (assoc files*-dsv-*-data-cod-ru :link-pattern #"http://files3?.dsv-region.data.cod.ru/.+"))

(def-download-prototype files2-dsv-region-data-cod-ru
  (assoc files*-dsv-*-data-cod-ru :link-pattern #"http://files2.dsv-region.data.cod.ru/.+"))
Defines files*-dsv-*-data-cod-ru, files2-dsv-*-data-cod-ru, files2-dsv-region-data-cod-ru, files3?-dsv-*-data-cod-ru, files3?-dsv-region-data-cod-ru (links are to index).

data.cod.ru

Прототип загрузки для пользовательской страницы файлообменников data.cod.ru.

Программа загрузки предельно проста: она вырезает из страницы файлообменника прямую ссылку на файл и создает дочернюю загрузку с прямой ссылкой.

<names>+= (<-U) [<-D->]
data-cod-ru-download-program

<definitions>+= (<-U) [<-D->]
(defn data-cod-ru-download-program [{:as dload :keys [link child-link child]}]
  (cond (not link) data-cod-ru-parse-page
        (not child) data-cod-ru-make-child-download
        :finally die))
Defines data-cod-ru-download-program (links are to index).

Функция data-cod-ru-parse-page вырезает из страницы прямую ссылку на файл.

<names>+= (<-U) [<-D->]
data-cod-ru-parse-page

<definitions>+= (<-U) [<-D->]
(defn data-cod-ru-parse-page [{:as dload :keys [link]}]
  {:pre [(supplied link)]}
  (let [client (new HttpClient)
        get (new GetMethod link)]
    (.. client getHttpConnectionManager getParams 
        (setConnectionTimeout connection-timeout*))
    (.. get getParams (setSoTimeout get-request-timeout*))
    (try (let [status (.executeMethod client get)]
           (if (= status HttpStatus/SC_OK)
             (let [child-link (re-find #"http://files[-\d\w\.]*data.cod.ru/[^\"]+"
                                       (slurp* (.getResponseBodyAsStream get)))]
               (if child-link
                 (assoc dload :child-link child-link)
                 (die dload)))
             (throw (Exception. "Fail to parse page."))))
         (finally (.releaseConnection get)))))
Defines data-cod-ru-parse-page (links are to index).

Функция data-cod-ru-make-child-download создает дочернюю загрузку если загрузки с такой ссылкой еще нет.

<names>+= (<-U) [<-D->]
data-cod-ru-make-child-download

<definitions>+= (<-U) [<-D->]
(defn data-cod-ru-make-child-download [{:as dload :keys [link child child-link path]}]
  {:pre [(supplied link child-link path)]}
  (if child dload
    (let [child (or (first (for [dl @downloads* :when (= child-link (:link @dl))] dl))
                (make-download child-link :path path))]
      (if-not child (die dload)
        (assoc dload :child child)))))
Defines data-cod-ru-make-child-download (links are to index).

Объявление прототипа.

<names>+= (<-U) [<-D]
data-cod-ru

<definitions>+= (<-U) [<-D]
(def-download-prototype data-cod-ru
  {:link-pattern #"http://[\w\-]*.data.cod.ru/\d+"
   :program data-cod-ru-download-program
   :link nil
   :path nil
   :child-link nil
   :child nil})
Defines data-cod-ru (links are to index).

Послесловие

Поздравляю. Надеюсь, вы не зря потратили свое время, если смогли дочитать до этого места. Не стесняйтесь использовать и дополнять эту программу своим кодом, она хранится на гитхабе в моем репозитории http://github.com/zahardzhan/leica.