1 Востаннє редагувалося vitia444 (14.06.2015 02:47:28)

Тема: Мультипотоковість

Є в мене ділянка коду, яку я хочу розпаралелити (зараз вона працює правильно)

LinkedListNode<Homosapien> current = population.First;
            LinkedListNode<Homosapien> next;
            
            while (current != null)
            {
                next = current.Next;

                current.Value.MakeStep(
                        GetMatrixConversion(current.Value.Age.Year,
                                            today.Month));                

                current = next;
            }

Додаткова інформація: MakeStep іноді викидує подію, що current помер, тоді він видаляється зі списку:

        public void MakeStep(double[][] Prob)
        {
            if (hystori.Last.Value.state == State.Dead)
                throw new InvalidOperationException("Individ is dead");

            State newState = (State)(GetRandIndex(Prob[(int)hystori.Last.Value.state]));

            if (newState != hystori.Last.Value.state)
            {
                hystori.AddLast(new Record(newState, today));
            }

            today.MoveNext();
            age.MoveNext();
            
            if (newState != State.Dead)
            {
                CanBorn();
            }
            else
            {
                RaiseImDie();
            }
        }

Розбив список, припустимо, на 4 частини, кожну пустив у потік, викинуло помилку, мов я оброблюю мертвого індивіда... Ні, проблема не в тому, що я покаліцьки розбив список, бо наступний код викидує ту ж помилку

            while (current != null)
            {
                next = current.Next;
                Task.Factory.StartNew(() =>
                {
                    current.Value.MakeStep(
                        GetMatrixConversion(current.Value.Age.Year,
                                            today.Month));
                });
                current = next;
            }

Питання - якого дива два потоки заходять в один об'єкт, коли я їм не кажу цього робити?

2

Re: Мультипотоковість

Намагався щось зрозуміти з пояснення, але не подужав ...

3 Востаннє редагувалося vitia444 (17.06.2015 21:51:57)

Re: Мультипотоковість

Ну ладно, про той випадок забули, зараз наступне:

        public void MakeStepForPopulation()
        {
            LinkedListNode<Homosapien> current = population.First;//мій список
            LinkedListNode<Homosapien> first;

            long interval = population.Count / THREAD_COUNT;
            go = false;//кажу, щоб жоден поток не почав свою роботу
            for (int i = 0; i < THREAD_COUNT - 1; i++)
            {//проходить роздача параметрів потокам
                first = current;
                for (long j = 0; j < interval; j++)
                    current = current.Next;

                threads[i] = Task.Run(() =>
                        {
                            MakeStepForPopulation_thread(first, interval);
                        });
            }
//останній потік в мене особливий, має починатись так
            threads[THREAD_COUNT - 1] = Task.Run(() =>
            {
                MakeStepForPopulation_thread(current, -1);
            });
//даю добро потокам
            go = true;
//чекаю потоки
            Task.WaitAll(threads);
        }

//власне, що роблять потоки
        public void MakeStepForPopulation_thread(LinkedListNode<Homosapien> current, long interval)
        {//тут потоки чекають
            while (!go)
                System.Threading.Thread.Sleep(10);

            LinkedListNode<Homosapien> next;
//тут іде обробка списку
            while (!(interval == 0 || current == null))
            {
                next = current.Next;
//MakeStep описаний повідомленням вище. Там є кілька особливих операцій, які змінюють головний список.
                current.Value.MakeStep(
                    GetMatrixConversion(current.Value.Age.Year,
                                        today.Month));
//відраховується лічильник
                interval--;
                current = next;
            }
        }

Тут є список елементів, я хочу розбити цей список на THREAD_COUNT, та пустити частини у обробку.
Наприклад, є список 1 2 3 4 5 6 7 8, THREAD_COUNT = 4, інтервал стане рівний 2 (по 2 елементи на поток), і мало б у перший поток на вхід податись (1, 2), на другий (3, 2), на третій (5, 2), і на останній (7, -1) (зроблено для того, щоб останній поток проходив не кокретну кількість елементів, а просто йшов до кінця, на це є певні причини).

Також я хочу, щоб всі потоки почали роботу після "роздачі завдань", тобто передачі параметрів (для уникнення стану гонки). Якщо тут є кращий спосіб - кажіть

Так от, в чому проблема. Проблема в тому, що по факту 3 потоки почались із 5 (а мали із 1 3 5), і один із 7. Тобто - перші три потоки брали параметри в циклі, де

                threads[i] = Task.Run(() =>
                        {
                            MakeStepForPopulation_thread(first, interval);
                        });

ну і останній брався окремо, то й получив все правильно. змінна first вийшла для всіх однаковою, ось в цьому проблема. В момент роздачі параметрів вона була різною. Чому так і як від цього позбутись?

4

Re: Мультипотоковість

Ви просто забули, що тут:

 MakeStepForPopulation_thread(first, interval);

і тут:

first = current;

змінна first - той самий об'єкт. На момент початку виконання, мабуть, він вказує на 5-й елемент Вашої population.
Якщо хочете щось подібне реалізувати, кожет потік повинен одержати власну копію, яку тільки він може змінювати, незалежно від інших трейдів.
Щоб позбавитись головного болю та не блукати у власному лабіринті, забудьте тут про ноди, простота - ґарантія безпеки:

Homosapien[] homoarray = population.ToArray();

, і MakeStepForPopulation_thread одержує цей масив, індекс початку для даного трєйда та кількість елементів для обробки.

5 Востаннє редагувалося vitia444 (17.06.2015 22:29:44)

Re: Мультипотоковість

mich_retten написав:

Ви просто забули, що тут:

 MakeStepForPopulation_thread(first, interval);

і тут:

first = current;

змінна first - той самий об'єкт. На момент початку виконання, мабуть, він вказує на 5-й елемент Вашої population.
Якщо хочете щось подібне реалізувати, кожет потік повинен одержати власну копію, яку тільки він може змінювати, незалежно від інших трейдів.

Бачте но, є 2 "але"
1. вони і мають отримувати власну копію, у локальну змінну. змінна в головному циклі змінюється, але ссилка на якийсь вузол копіюється в параметр методу (тобто мало б так бути, але так не є)
2. робив варіант без потоків, тоді все працювало, тобто зміна змінної у методі не змінювала змінної у головному циклі

Щоб позбавитись головного болю та не блукати у власному лабіринті, забудьте тут про ноди, простота - ґарантія безпеки:

Homosapien[] homoarray = population.ToArray();

, і MakeStepForPopulation_thread одержує цей масив, індекс початку для даного трєйда та кількість елементів для обробки.

Тут теж є своє "але"... під час обробки списку здійснюється його зміна (додавання-видалення елементів), для масиву так не вийде, плюс дууууже довгий цей список (1 000 000 елементів)

6

Re: Мультипотоковість

Може, такий трік виправить ситуацію, але не певен. Спробуйте:

                threads[i] = Task.Run(() => {
                            LinkedListNode<Homosapien> arg = first;
                            MakeStepForPopulation_thread(arg, interval);
                        });

7

Re: Мультипотоковість

mich_retten написав:

Може, такий трік виправить ситуацію, але не певен. Спробуйте:

                threads[i] = Task.Run(() => {
                            LinkedListNode<Homosapien> arg = first;
                            MakeStepForPopulation_thread(arg, interval);
                        });

робив, не виправило

8

Re: Мультипотоковість

Тобто у Вас є список, який Ви хочете обробляти декількома потоками, при цьому змінювати. Так?
Ви не можете цього зробити (надійно) з зв'язаним списком, бо msdn стверджує, що клас не є потокобезпечним (thread safe):

The LinkedList<T> class does not support chaining, splitting, cycles, or other features that can leave the list in an inconsistent state. The list remains consistent on a single thread. The only multithreaded scenario supported by LinkedList<T> is multithreaded read operations.
тобто Ви не можете змінювати такий список у потоках.

Підемо від простого: Ви маєте список з 4х елементів, два потоки "одержали" по два елементи (так ми гадаємо).
Перший поток вирішив, що треба видалити елемент після останнього (у його частки), а другий - видалити елемент перед першим (з його частки). Що повинно відбутися? Синхронізації у Вас не передбачено (якщо виключити цю маґічну змінну "go").
Може, розбити цей список на декілька автономних, обробити в потоках, а потім злити в один?
Я не знаю повністю Вашої задачі, просто спробую навести Вас самого на якесь прийнятне рішення.

Вибачте, може, це зайве пояснення, якого Ви не потребуєте, але я приведу аналогію.
Обробка у потоках - це як обробка кількома особами однієї деталі одночасно. Якщо не синхронізувати роботу цих "робітників", то кожен з них робить свою справу ніяк не зважаючи на інших. Обов'язково виникнуть конфлікти. Один з робітників вибирає для обрабки "наступний елемент", починає обробляти, і у цей час інший видаляє цей елемент (як приклад).

9

Re: Мультипотоковість

mich_retten написав:

Тобто у Вас є список, який Ви хочете обробляти декількома потоками, при цьому змінювати. Так?
Ви не можете цього зробити (надійно) з зв'язаним списком, бо msdn стверджує, що клас не є потокобезпечним (thread safe):

The LinkedList<T> class does not support chaining, splitting, cycles, or other features that can leave the list in an inconsistent state. The list remains consistent on a single thread. The only multithreaded scenario supported by LinkedList<T> is multithreaded read operations.
тобто Ви не можете змінювати такий список у потоках.

Підемо від простого: Ви маєте список з 4х елементів, два потоки "одержали" по два елементи (так ми гадаємо).
Перший поток вирішив, що треба видалити елемент після останнього (у його частки), а другий - видалити елемент перед першим (з його частки). Що повинно відбутися? Синхронізації у Вас не передбачено (якщо виключити цю маґічну змінну "go").
Може, розбити цей список на декілька автономних, обробити в потоках, а потім злити в один?
Я не знаю повністю Вашої задачі, просто спробую навести Вас самого на якесь прийнятне рішення.

Тим не менше, проблема ж зараз не в цьому, я не шукаю як обійти цю проблему, я шукаю як її вирішити.
Та й такі випадки, як Ви навели, дуже малоймовірні у мому випадку.
Синхронізацію я робитиму свою
Ви певно могли і не помітити, але

Також я хочу, щоб всі потоки почали роботу після "роздачі завдань", тобто передачі параметрів (для уникнення стану гонки). Якщо тут є кращий спосіб - кажіть

(Я знаю вумні слова)))))

10

Re: Мультипотоковість

Ніякого "стану гонки" і так не буде. Це ваше вумне слово (go) - абсолютно зайва деталь.
Десь у Вашій               

 current.Value.MakeStep(
                    GetMatrixConversion(current.Value.Age.Year,
                                        today.Month));

видаляється елемент списку, який одночасно обробляється паралельним потоком.
Треба змінювати всю архитектуру обробки. Самий простий і прямий спосіб - кожному потоку - свою колекцію, після обробки - merge.

11

Re: Мультипотоковість

mich_retten написав:

Ніякого "стану гонки" і так не буде. Це ваше вумне слово (go) - абсолютно зайва деталь.
Десь у Вашій               

 current.Value.MakeStep(
                    GetMatrixConversion(current.Value.Age.Year,
                                        today.Month));

видаляється елемент списку, який одночасно обробляється паралельним потоком.
Треба змінювати всю архитектуру обробки. Самий простий і прямий спосіб - кожному потоку - свою колекцію, після обробки - merge.

Стан гонки полягає в тому, що підлеглий поток може досить швидко якийсь елемент видалити, якраз в той момент, коли

                for (long j = 0; j < interval; j++)
                    current = current.Next;


виконуватиметься у головному потоці, тоді або помилка, або список закінчиться, або ще щось

Різні колекції різним потокам - надто багато роботи із пам'яттю, хоча якщо мені вдасться реалізувати якийсь швидкий і легкий алгоритм - то я так зроблю, АЛЕ мені це не допоможе, якщо і надалі передаватиметься оригінал змінної у потік, тоді як мала б передаватись копія.
Я зараз на етапі перегляду IL коду, з надією що може там я знайду відповідь "чому передається оригінал"

12

Re: Мультипотоковість

vitia444 написав:

може там я знайду відповідь "чому передається оригінал"

Що означає "передається оригінал"? Наведіть код, який Ви маєте на увазі.

13 Востаннє редагувалося vitia444 (18.06.2015 16:20:12)

Re: Мультипотоковість

                 threads[i] = Task.Run(() =>
                        {
                            MakeStepForPopulation_thread(first, interval);
                        });

Я просто думаю, що тут або передається оригінал замість копії, або їх ініціалізація (параметрів методу) відбувається трохи пізнувато (по завершеню циклу), інакше яка може бути причина, що потоки отримали однакові вхідні дані в цьому коді?

14

Re: Мультипотоковість

Доречі, зверніть увагу, у Вас перші interval елементів списку не обробляються жодним потоком: Ви спочатку берете перший елемент, потім пропускаєте interval елементів, і наступний елемент віддається потоку.

15

Re: Мультипотоковість

mich_retten написав:

Доречі, зверніть увагу, у Вас перші interval елементів списку не обробляються жодним потоком: Ви спочатку берете перший елемент, потім пропускаєте interval елементів, і наступний елемент віддається потоку.

Не зрозумів, що Ви сказали, але в будь-якому випадку в однопоточному режимі все оброблюється правильно (в тому числі перші елементи)

16

Re: Мультипотоковість

vitia444 написав:
mich_retten написав:

Доречі, зверніть увагу, у Вас перші interval елементів списку не обробляються жодним потоком: Ви спочатку берете перший елемент, потім пропускаєте interval елементів, і наступний елемент віддається потоку.

Не зрозумів, що Ви сказали, але в будь-якому випадку в однопоточному режимі все оброблюється правильно (в тому числі перші елементи)

Поставтпе breakpoint на рядку 16:

                        MakeStepForPopulation_thread(first, interval);

і подивіться, що знаходиться на цей момент у first. Гадаю, не перший елемент списку, а той, що відстоїть interval від першого.

17

Re: Мультипотоковість

mich_retten написав:

Поставтпе breakpoint на рядку 16:

                        MakeStepForPopulation_thread(first, interval);

і подивіться, що знаходиться на цей момент у first. Гадаю, не перший елемент списку, а той, що відстоїть interval від першого.

Я вже ставив там breakPoint, через що і кажу - там все в порядку, наведу арумент чому:

                first = current;
                for (long j = 0; j < interval; j++)
                    current = current.Next;

цей цикл first не змінює (знаю що тупо, але це не головна проблема зараз)

18 Востаннє редагувалося vitia444 (20.06.2015 00:45:52)

Re: Мультипотоковість

Все, помилка знайдена)
Причина - банально практики не хватає з мультипоточкою, якби вона була - не допустив би такої помилки
Сама помилка в наступному:

           for (int i = 0; i < THREAD_COUNT - 1; i++)
           {//проходить роздача параметрів потокам
                first = current;
                for (long j = 0; j < interval; j++)
                    current = current.Next;
 
                threads[i] = Task.Run(() =>
                        {
                            MakeStepForPopulation_thread(first, interval);
                        });
            }
            //останній потік в мене особливий, має починатись так
            threads[THREAD_COUNT - 1] = Task.Run(() =>
            {
                MakeStepForPopulation_thread(current, -1);
            });

Не зважаючи на те, що завдання таскам я присвоював поступово, тіло завдань починало виконуватись трохи пізніше, а саме після того, як весь цикл пройде, таким чином first вже був кінцевим, навіть для того потоку, який мав би початись першим. Вирішив це наступним методом:

        private int startedThreadCount;

        public void MakeStepForPopulation()
        {
            LinkedListNode<Homosapien> current = population.First;
            //LinkedListNode<Homosapien> next;
            LinkedListNode<Homosapien> first;
            
            long interval = population.Count / THREAD_COUNT;
            
            startedThreadCount = 0;

            for (int i = 0; i < THREAD_COUNT - 1; i++)
            {
                first = current;
                for (long j = 0; j < interval; j++)
                    current = current.Next;

                threads[i] = Task.Run(() =>
                        {
                            MakeStepForPopulation_thread(first, interval);
                        });
//тут чекаємо, поки поток не почнеться, щоб нормально задались параметри
                while (startedThreadCount <= i)
                    System.Threading.Thread.Sleep(1);
            }

            threads[THREAD_COUNT - 1] = Task.Run(() =>
            {
                MakeStepForPopulation_thread(current, -1);
            });

            Task.WaitAll(threads);
        }

        public void MakeStepForPopulation_thread(LinkedListNode<Homosapien> current, long interval)
        {
//відзначається вхід потоку
            startedThreadCount++;
//ждем, поки всі потоки не почнуться
            while (startedThreadCount != THREAD_COUNT)
                System.Threading.Thread.Sleep(10);

            LinkedListNode<Homosapien> next;

            while (!(interval == 0 || current == null))
            {
                next = current.Next;
                current.Value.MakeStep(
                    GetMatrixConversion(current.Value.Age.Year,
                                        today.Month));

                interval--;
                current = next;
            }
        }