Параллелизм в Golang и WorkerPool [Часть 2]

МЕНЮ


Искусственный интеллект
Поиск
Регистрация на сайте
Помощь проекту

ТЕМЫ


Новости ИИРазработка ИИВнедрение ИИРабота разума и сознаниеМодель мозгаРобототехника, БПЛАТрансгуманизмОбработка текстаТеория эволюцииДополненная реальностьЖелезоКиберугрозыНаучный мирИТ индустрияРазработка ПОТеория информацииМатематикаЦифровая экономика

Авторизация



RSS


RSS новости


2020-12-26 21:06

разработка по

Перевод публикуется с сокращениями, автор оригинальной статьи Ahad Hasan.

Горутины и каналы – это крутые языковые структуры, которые делают Golang мощным параллельным языком. Он отлично справляется с ограничениями использования ресурсов, что было продемонстрировано на простом примере в предыдущей публикации.

Для достижения цели можно использовать один общий канал. Давайте посмотрим, как все это реализовать.

Архитектура

Создаем пакет workerpool, который может обрабатывать задачи с воркерами на основе параллелизма. Рассмотрим структуру каталогов:

         workerpool ??? pool.go ??? task.go ??? worker.go     

Каталог workerpool находится в корне проекта. Теперь необходимо разобраться в терминах. Task – это самостоятельный элемент, который необходимо обработать. Worker – функция, обрабатывающая выполнение задачи. Pool фактически занимается созданием и управлением воркерами.

Реализация

Сначала напишем Task:

         // workerpool/task.go  package workerpool  import ( 	"fmt" )  type Task struct { 	Err  error 	Data interface{} 	f    func(interface{}) error }  func NewTask(f func(interface{}) error, data interface{}) *Task { 	return &Task{f: f, Data: data} }  func process(workerID int, task *Task) { 	fmt.Printf("Worker %d processes task %v ", workerID, task.Data) 	task.Err = task.f(task.Data) }     

Task содержит все необходимое для обработки задачи. Мы передаем ей Data и функцию f, которая должна быть выполнена, с помощью функции process. Функция f принимает Data в качестве параметра для обработки, а также храним возвращаемую ошибку. Давайте посмотрим, как Worker обрабатывает эти задачи:

         // workerpool/worker.go  package workerpool  import ( 	"fmt" 	"sync" )  // Worker контролирует всю работу type Worker struct { 	ID       int 	taskChan chan *Task }  // NewWorker возвращает новый экземпляр worker-а func NewWorker(channel chan *Task, ID int) *Worker { 	return &Worker{ 		ID:       ID, 		taskChan: channel, 	} }  // запуск worker func (wr *Worker) Start(wg *sync.WaitGroup) { 	fmt.Printf("Starting worker %d ", wr.ID)  	wg.Add(1) 	go func() { 		defer wg.Done() 		for task := range wr.taskChan { 			process(wr.ID, task) 		} 	}() }     

В коде Worker принимает идентификатор воркера и канал, в который должны быть записаны задачи. В методе Start входящие задачи распределяются по taskChan для обработки внутри goroutine.

Worker Pool

Мы реализовали Task и Worker для обработки задач, но здесь есть недостающая часть – порождение воркеров и отправка им заданий. Всем этим должен заведовать Worker Pool.

         // workerpoo/pool.go  package workerpool  import ( 	"fmt" 	"sync" 	"time" )  // Pool воркера type Pool struct { 	Tasks   []*Task  	concurrency   int 	collector     chan *Task 	wg            sync.WaitGroup }  // NewPool инициализирует новый пул с заданными задачами и  func NewPool(tasks []*Task, concurrency int) *Pool { 	return &Pool{ 		Tasks:       tasks, 		concurrency: concurrency, 		collector:   make(chan *Task, 1000), 	} }  // Run запускает всю работу в Pool и блокирует ее до тех пор,  // пока она не будет закончена. func (p *Pool) Run() { 	for i := 1; i <= p.concurrency; i++ { 		worker := NewWorker(p.collector, i) 		worker.Start(&p.wg) 	}  	for i := range p.Tasks { 		p.collector <- p.Tasks[i] 	} 	close(p.collector)  	p.wg.Wait() }     

Рабочий пул содержит все задачи, которые ему необходимо обработать, и принимает число в качестве входных данных, чтобы породить нужное количество горутин для одновременного выполнения задач. Он имеет буферизованный коллектор каналов, общий для всех рабочих.

Когда мы запускаем этот Worker Pool, он порождает необходимое количество воркеров, которые используют общий коллектор каналов. Далее мы разбираем задачи, записываем их в канал и синхронизируем все с WaitGroup. Теперь, давайте проверим наше решение:

         // main.go  package main  import ( 	"fmt" 	"time"  	"github.com/Joker666/goworkerpool/workerpool" )  func main() { 	var allTask []*workerpool.Task 	for i := 1; i <= 100; i++ { 		task := workerpool.NewTask(func(data interface{}) error { 			taskID := data.(int) 			time.Sleep(100 * time.Millisecond) 			fmt.Printf("Task %d processed ", taskID) 			return nil 		}, i) 		allTask = append(allTask, task) 	}  	pool := workerpool.NewPool(allTask, 5) 	pool.Run() }     

Создали 100 тасков и используем 5 процессов для их обработки. Взглянем на результат:

         Worker 3 processes task 98 Task 92 processed Worker 2 processes task 99 Task 98 processed Worker 5 processes task 100 Task 99 processed Task 100 processed Took ===============> 2.0056295s     

Для обработки 100 задач нам потребуется две секунды, а если мы поменяем 5 на 10, то увидим, что для обработки всех задач потребуется всего около одной секунды.

Мы создали надежное решение для Worker Pool, которое может обрабатывать concurrency, хранить ошибки в задаче и отправлять данные для обработки. Этот подход является универсальным и не связан с конкретной реализацией. Мы можем использовать его и для решения более серьезных проблем.

Дальнейшее расширение: обработка задач в фоне

Попробуем расширить наше приложение: worker-ы продолжают ждать новые зaдачи в фоновом режиме, а мы отправляем им новые для обработки. Для этого нужно будет немного изменить Worker:

         // workerpool/worker.go  // Worker контролирует всю работу type Worker struct { 	ID       int 	taskChan chan *Task 	quit     chan bool }  // NewWorker возвращает новый экземпляр worker-а func NewWorker(channel chan *Task, ID int) *Worker { 	return &Worker{ 		ID:       ID, 		taskChan: channel, 		quit:     make(chan bool), 	} }  ....  // StartBackground запускает worker-а в фоне func (wr *Worker) StartBackground() { 	fmt.Printf("Starting worker %d ", wr.ID)  	for { 		select { 		case task := <-wr.taskChan: 			process(wr.ID, task) 		case <-wr.quit: 			return 		} 	} }  // Остановка quits для воркера func (wr *Worker) Stop() { 	fmt.Printf("Closing worker %d ", wr.ID) 	go func() { 		wr.quit <- true 	}() }     

Мы добавляем канал выхода и два новых метода в структуру Worker. StartBackgorund запускает бесконечный цикл for с select для чтения из taskChan и обработки задачи. Если StartBackgorund читает из данного канала, то данные возвращаются из функции. Метод Stop записывает данные в канал quit.

Вооружившись двумя новыми методами, добавим в Pool несколько новых штук:

         // workerpool/pool.go  type Pool struct { 	Tasks   []*Task 	Workers []*Worker  	concurrency   int 	collector     chan *Task 	runBackground chan bool 	wg            sync.WaitGroup }  // AddTask добавляет таски в pool func (p *Pool) AddTask(task *Task) { 	p.collector <- task }  // RunBackground запускает pool в фоне func (p *Pool) RunBackground() { 	go func() { 		for { 			fmt.Print("? Waiting for tasks to come in ... ") 			time.Sleep(10 * time.Second) 		} 	}()  	for i := 1; i <= p.concurrency; i++ { 		worker := NewWorker(p.collector, i) 		p.Workers = append(p.Workers, worker) 		go worker.StartBackground() 	}  	for i := range p.Tasks { 		p.collector <- p.Tasks[i] 	}  	p.runBackground = make(chan bool) 	<-p.runBackground }  // Stop останавливает запущенных в фоне worker-ов func (p *Pool) Stop() { 	for i := range p.Workers { 		p.Workers[i].Stop() 	} 	p.runBackground <- true }     

Структура Pool теперь содержит воркеров и имеет канал runBackground, который помогает ему держаться на плаву. У нас появилось 3 новых метода. AddTask добавляет таску в коллектор канала.

Метод RunBackground работает бесконечно и порождает goroutine, чтобы поддерживать Pool живым вместе с каналом runBackground. Эта техника, позволяет запускать вечное выполнение чтения из пустого канала.

Метод Stop, останавливает воркеров и пишет в runBackground, чтобы завершить его. Посмотрим, как все это работает сейчас.

Если бы у нас был реальный пример из жизни, он работал бы вместе с HTTP-сервером и выполнял бы задачи. Повторим подобное поведение с бесконечным циклом и определенным условием:

         // main.go  ...  pool := workerpool.NewPool(allTask, 5) go func() { 	for { 		taskID := rand.Intn(100) + 20  		if taskID%7 == 0 { 			pool.Stop() 		}  		time.Sleep(time.Duration(rand.Intn(5)) * time.Second) 		task := workerpool.NewTask(func(data interface{}) error { 			taskID := data.(int) 			time.Sleep(100 * time.Millisecond) 			fmt.Printf("Task %d processed ", taskID) 			return nil 		}, taskID) 		pool.AddTask(task) 	} }() pool.RunBackground()     

После запуска кода, будет видно, что в очередь вставляется случайная задача, пока воркеры работают в фоновом режиме, но в итоге один из них эту задачу возьмет. Он остановится, когда будет выполнено соответствующее условие.

Вывод

Мы изучили, как можно построить надежное решение с Worker Pool из первой части цикла. Кроме того мы расширили возможности реализации пула, работающего в фоновом режиме для выполнения дальнейших входящих задач.

Источники


Источник: proglib.io

Комментарии: