Ну ка Thread'ы, встаньте в ряд!

 
0
 
Java
ava
Hidrag | 13.12.2007, 11:43
Пишу софтинку переноса данных из одной схемы БД в другую, СУБД разные.
Схема работы такая селектами подтягиваю данные из одной БД, трансформирую их как нужно, создаю из этих данный большой sql блок инсертов и стартую поток для коммита этих данный в бд приемник.

Класс потока выглядит так:

package util;

import MMApp.MainForm;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.log4j.Logger;

public class StartInsert extends Thread {

String s=null;

public StartInsert(String sql,int pri) {
this.s = new String(sql);
this.setPriority(pri);
}

public void run() {
ooo(s);
}

public static synchronized void ooo(String s) {
Connection con = null;
Statement st = null;
try {
con = DBManager.getInstance().sybaseConnect;
st = con.createStatement();
st.addBatch(s);
st.executeBatch();
con.commit();
} catch (SQLException ex) {
try {
con.rollback();
st.close();
} catch (SQLException eeex) {
MainForm.logger.error(eeex);
}
MainForm.logger.error("ОШИБКА: Ошибка выполнения globalStatement",ex);
MainForm.logger.error(s);
}
}
}


затем получаю следующий блок данных из бд источника, тоже их как то трансформирую, как только закончу все трансформации стартую новый поток. Сделано так для того, чтобы не дожидаться коммита в бд приемник а сразу после завершения трансформации одного блока приступать к другому, поскольку СУБД приемника может быть загружена и инсерты с коммитом будут выполняться долго.

НО часто так бывает что инсерт следующего блока будет не возможен по ограничениям целостности, без коммита предыдущего блока. Если на однопроцессорной машине можно как то выстроить потоки благодаря приоритетам, то на многопроцессорной машине думаю будут проблемы, когда возможно ситуация, что следующий поток получит доступ к базе раньше чем отработал предыдущий и попытается закомитится, что вызовет ошибку. Так часто и случалось даже на однопроцессорной машине, без применения приоритетов.

Вопрос, как "построить" потоки, чтобы каждый созданный новый поток получил доступ к бд, только после того как отработает поток созданный перед ним, какие будут мысли?

Если не понятно обьяснил, напишите, постараюсь подробней....
Ответы (11)
ava
AlexeyVorotnikov | 13.12.2007, 11:53 #
Использовать приоритеты для синхронизации потоков -- изначально порочная идея при любом количестве процессоров. Почитайте какие-нибудь книжки или статьи про синхронизацию потоков в Java. Обратите внимание на методы Object.wait(), Object.notify(), Object.notifyAll(), Thread.join(). А также на synchronized методы и блоки.
ava
ivg | 13.12.2007, 12:55 #
Цитата (Hidrag @ 13.12.2007, 11:43 findReferencedText)
инсерт следующего блока будет не возможен по ограничениям целостности, без коммита предыдущего блока

Могу ошибаться, но по моему, если оба инсерта выполняются в одной сессии, а точнее транзакции, то исключений возникать не должно. Может для разных СУБД по разному, у Oracle, например вроде так.
Цитата (Hidrag @ 13.12.2007, 11:43 findReferencedText)
Вопрос, как "построить" потоки, чтобы каждый созданный новый поток получил доступ к бд, только после того как отработает поток созданный перед ним, какие будут мысли?

Вот если строгая последовательность выполнения одного задания за другим, то по моему, здесь не нужно несколько потоков, один справится. Ставите ему задания в очередь, а поток их выполняет ( см. java.util.concurrent.BlockingQueue (PriorityBlockingQueue)). Лучше в одной сессии, т. е. коннекшен каждый раз открывать накладно.
Что же касается заданий, которые могут быть выполнены без приоритета и очерёдности, то тут то и можно использовать несколько потоков, организовав их в пул. ( см. java.util.concurrent.ThreadPoolExecutor ). Хотя может, можно и без пула. Тут тоже какая нибудь BlockingQueue, в которую ставятся задания. Каждый поток извлекает из очереди задание, и выполняет его.
Примерно так.
PS: посмотрите, что ещё есть в пакете java.util.concurrent
ava
ecologist | 13.12.2007, 13:02 #
"Строить" потоки - дело неблагодарное и ужасно по своим последствиям.

AlexeyVorotnikov тебе по идее правильно предложил - используй блокировки.

Но по-моему изначально задача спроектирована. Ну пусть выполняется последовательно и долго - не вижу в этом проблемы. Важно, чтобы целостность и стабильность работы соблюдалась. И однозначность исполнения. Т.е. команды выполняются в одном и том же порядке при каждом запуске. А при тредовом варианте получается никаких гарантий. Нафиг такое надо ?
ava
Hidrag | 13.12.2007, 13:04 #
Конекш открывается при запуске и закрывается когда вся конвертация выполнена.
Инсерты выполняются в разных транзакциях, поэтому могут быть ошибки.

А вот идея не плодить сотни потоков, а создать один и давать ему задания поочередно - хорошая идея, думаю...
Строгой последовательности ВСЕХ данных нет, например есть 100 сформированных блоков, 99 из них могут зависить всего от одного, поэтому он создается и стартует ДО того как будет дан старт зависимым блокам, но бывает что кто то из тех 99 получит коннект до того как тот один закомитится и упадет по констрейнтам :(

смысл создания многопоточного приложения был в том чтобы время конвертации данных не зависело от времени их вставки в бд.
ava
ivg | 13.12.2007, 13:35 #
Цитата (ecologist @ 13.12.2007, 13:02 findReferencedText)
"Строить" потоки - дело неблагодарное и ужасно по своим последствиям.

Волков бояться - в лес не ходить. :biggrin
Цитата (Hidrag @ 13.12.2007, 13:04 findReferencedText)
Конекш открывается при запуске и закрывается когда вся конвертация выполнена.

Один коннекшн на несколько потоков?
Вот тут то точно трудности возникнут. Может в каких то ситуациях, один коннекшн и можно разделять между несколькими потоками, тогда надо чётко представлять как работает конкретная реализация интерфейса, решение сразу становится не переносимым, проще не заморачиваться и использовать пул. Ну или строгая привязка по одному коннекшену на поток.
ava
Hidrag | 13.12.2007, 14:28 #
да нам и не нужна переносимость, софтинка пишется для одной задачи, есть несколько десятков бд, данные из которых нужно перевести на новую субд и все. То есть одноразовая задача.
Проблем возникнуть не должно, по скольку

public static synchronized void ooo(String s) {


и


public static synchronized DBManager getInstance() {
ava
ivg | 13.12.2007, 15:52 #
Цитата (Hidrag @ 13.12.2007, 14:28 findReferencedText)




public static synchronized void ooo(String s) {




Так у вас тогда никакого выигрыша от многопоточности не будет.
Цитата (Hidrag @ 13.12.2007, 11:43 findReferencedText)
стартую новый поток. Сделано так для того, чтобы не дожидаться коммита в бд приемник а сразу после завершения трансформации одного блока приступать к другому, поскольку СУБД приемника может быть загружена и инсерты с коммитом будут выполняться долго.

Смотрите. Поскольку у вас статик метод объявлен synchronized, то синхронизация будет происходить на мониторе Класса StartInsert, не на конкретном экземпляре этого класса который вы создаете, когда стартуете новый поток, а на instance самого класса, который per JVM, per ClassLoader существует в единственном экземпляре. Пока поток выполняет длительную операцию (st.executeBatch(); или con.commit();) он занял монитор и другие потоки, которые в это время должны работать ждут освобождения монитора у входа в метод ooo(String s), монитор освободится, когда поток покинет метод. С таким же успехом, вся работа могла бы выполняться в одном потоке, который бы просто брал задания из очереди и выполнял их.
Если убрать static или synchronized у этого метода, то получим грабли с расшаренным коннекшеном.
Можно вот такой вариант предложить:
В классе StartInsert создать поле для хранения собственного экземпляра Connection для потока, его создавать и передавать в конструктор, либо через DBManager получать новый экземпляр. Необходимое кол-во потоков создать при старте приложения, а не создавать их каждый раз при появлении задания. Каждый такой поток будет работать в цикле, в котором выбирает задания из BlockingQueue и исполнять их. Предусмотреть сигнализацию, по которой поток выйдет из цикла. Проще всего это сделать помещая в очередь, какое-либо определённое сообщение. Здесь сообщений таких по кол-ву потоков.
Вот, в общих чертах.
ava
Hidrag | 13.12.2007, 16:15 #
ivg, Спасибо!
передалаю наверное в один поток которому буду в очередь класть инсерты, а он в цикле будет проверять эту очередь, и при наличии там элементов выполнять их.

Чисто из любопытства, а если каждому потоку в моем случае давать свой коннекшн, например из пула ( а в пуле их сделать штук 100) повысит ли это производительность, при этом предварительно прогрузив блоки от которых могут зависить другие...
ava
ivg | 13.12.2007, 16:44 #
Вообще это целиком и полностью зависит от конкретных условий, я же исходил из этого
Цитата (Hidrag @ 13.12.2007, 11:43 findReferencedText)
поскольку СУБД приемника может быть загружена и инсерты с коммитом будут выполняться долго

Если у вас скорость получения данных из БД источника и их трансформации существенно быстрее, чем скорость их вставки в БД приёмника, если БД приёмника обеспечивает условия параллельной работы сессий, т. е. если работа в n-параллельных сессиях выполняется быстрее, чем n-работ в одной сессии, тогда скорей всего производительность возрастёт (должна :biggrin ) Но тут лучше на конретных примерах экспериментально выяснить этот вопрос.
PS: Мысль - поиграться с разными уровнями изоляции транзакции. Тоже влияет. Правда зависит от СУБД.
ava
COVD | 13.12.2007, 16:52 #
Цитата


...... чтобы не дожидаться коммита в бд приемник а сразу после завершения трансформации одного блока приступать к другому..... инсерт следующего блока будет не возможен по ограничениям целостности, без коммита предыдущего блока.



один SingleThreadExecutor читает данные, готовит задания и передает их второму SingleThreadExecutor , который пишет в базу. Если два задания составляют транзакцию, т.е. второе задание не имеет смысла без первого, то такие задания надо обьединять в одно в промежуточном буфере.
ava
Stampede | 14.12.2007, 20:04 #
Цитата (Hidrag @ 13.12.2007, 06:15 findReferencedText)
передалаю наверное в один поток которому буду в очередь класть инсерты, а он в цикле будет проверять эту очередь, и при наличии там элементов выполнять их.


Да, при заданной постановке задачи это будет, пожалуй, самым правильным решением. Коль скоро и трансформация исходных данных, и вставка подготовленных блоков занимают определенное время, допустим, M и N соответственно, то при последовательном выполнении суммарное время будет равно M + N, а при использовании очереди - где то примерно max (M, N).

Для повышения скоррости вставки можно также посоветовать формировать пакеты при помощи (Prepared)Statemenet.addBatch() и Statement.executeBatch(), но реально это сэкономит только сетевые раунд-трипы, так что при близко (в сетевом расстоянии) расположенной базе выигрыш будет невелик. Тем более если все это всего лишь одноразовая заморочка. Но вообще знать про пакетный режим не помешает.
Зарегистрируйтесь или войдите, чтобы написать.
Фирма дня
Вы также можете добавить свою фирму в каталог IT-фирм, и публиковать статьи, новости, вакансии и другую информацию от имени фирмы.
Подробнее
Участники
advanced
Отправить