Another midnight scraper tool
March 27, 2020
Python, Pip Requirements

  • linkedin_interview_prep_downloader.py
  • requirements.txt
import asyncio
import os
from asgiref.sync import sync_to_async

from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait


class Crawler(object):

    def __init__(self, url: str, path: dict, cookies: [dict]=None,
                 headers:dict=None, test_mode: bool=False, timeout=30,
                 username=None, password=None) -> None:
        self.url = url
        self.cookies = cookies
        self.headers = headers or {}
        self.test_mode = test_mode
        self.path = path or PATH
        self.time_out = timeout
        self.username = username
        self.password = password

        self.all_video_data = []
        self.processed_video_data = []
        self.failed_video_data = []
        self.browser = None

    async def launch(self):
        self.browser = webdriver.Chrome(chrome_options=webdriver.ChromeOptions())
        self.browser.maximize_window()
        self.browser.implicitly_wait(self.time_out)
        self.browser.get(self.url)
        if self.cookies:
            for cookie in self.cookies:
                self.browser.add_cookie(cookie)
            self.browser.refresh()
        elif self.username and self.password:
            await self.login()
        try:
            await self.crawl()
        finally:
            self.browser.quit()
            self.browser = None

    async def login(self):
        for sub_path in self.path['ROOT_PATH_USER_LOGIN']:
            targets, action = await self._get_markers(sub_path)
            await self._run_action(action, targets[0])

    async def get_video_link(self, sub_path):
        video_link = ''
        for p in sub_path:
            try:
                elements, action = await self._get_markers(p)
            except Exception as e:
                print(f'ERROR OCCURRED: Failed to execute self._get_markers({p}) - {e}')
            else:
                for el in elements:
                    value = await self._run_action(action, el)

                    if action == 'src':
                        video_link = value
        return video_link

    @sync_to_async
    def _run_action(self, action, element):
        value = None
        if action == 'click':
            self.browser.execute_script("arguments[0].scrollIntoView();", element)
            value = WebDriverWait(self.browser, 30).until(
                lambda x=None: element
                if element and element.is_enabled()
                else False
            ).click()

        elif action == 'text':
            value = element.get_attribute('innerText')

        elif action == 'src':
            value = element.get_attribute('src')

        elif action in ('username', 'password'):
            value = WebDriverWait(self.browser, 15).until(
                lambda x=None: element
                if element and element.is_enabled() and element.is_displayed()
                else False
            ).send_keys(getattr(self, action))
        return value

    @sync_to_async
    def _get_markers(self, sub_path):
        marker_type, marker, vrange, action = sub_path

        if action == 'src':
            video_iframe = self.browser.find_elements_by_tag_name("iframe")[0]
            self.browser.switch_to.frame(video_iframe)
        else:
            self.browser.switch_to.parent_frame()

        if marker_type == 'selector':
            targets = WebDriverWait(self.browser, 10).until(
                lambda x=None : self.browser.find_elements_by_css_selector(marker)
            )
        elif marker_type == 'xpath':
            targets = WebDriverWait(self.browser, 10).until(
                lambda x=None : self.browser.find_elements_by_xpath(marker),
                message=f'XPathError: {marker} not found.'
            )
        else:
            raise ValueError(f'Invalid marker type: {marker_type}')

        if -1 <= vrange >= 1:
            targets = targets[:vrange]
            print(targets[:vrange])
        return targets, action

    async def crawl(self):
        video_pages, action = await self._get_markers(self.path['ROOT_PATH'][0])
        print(self.path['ROOT_PATH'][0])
        print(len(video_pages))
        for i, page_element in enumerate(video_pages):
            self.browser.switch_to.parent_frame()
            self.browser.execute_script("arguments[0].scrollIntoView();", page_element)
            page_element.click()

            # Get video title
            video_title_element, action = await self._get_markers(self.path['ROOT_PATH'][1])
            if len(video_title_element) >= 1:
                video_title = await self._run_action(action, video_title_element[0])
            else:
                raise ValueError('No markers found')

            video_question_link = await self.get_video_link(self.path['ROOT_PATH_QUESTIONS'])
            print(f'Found question src: {video_question_link}')

            video_answer_link = await self.get_video_link(self.path['ROOT_PATH_ANSWERS'])
            print(f'Found answer src: {video_answer_link}')

            self.all_video_data.append({
                'q_name': f'{i}_{video_title}_question',
                'q_link': video_question_link,
                'a_name': f'{i}_{video_title}_answer',
                'a_link': video_answer_link
            })
            self.browser.execute_script("arguments[0].style.display='none';", page_element)

    async def download_videos(self, dry_run=False):
        download_dir = os.path.join(
            os.path.expanduser('~'), 'Downloads', 'Linkedin-Interview-Prep'
        )
        if not os.path.exists(download_dir):
            os.mkdir(download_dir)
        print(self.all_video_data)
        while self.all_video_data or self.browser:
            print('Downloading...')
            if self.all_video_data:
                process_return_code = 0
                video_data = self.all_video_data.pop(0)
                if video_data.get('q_name') and video_data.get('q_link'):
                    print(f"Downloading video...: {video_data['q_name']}")
                    name = video_data['q_name'].replace('.','').strip()
                    q_file_name = os.path.join(download_dir, f"{name}.mp4")
                    q_command = f"wget {video_data['q_link']} -O '{q_file_name}'"
                    if not dry_run:
                        process = await asyncio.subprocess.create_subprocess_shell(q_command)
                        process_return_code = process.returncode
                    else:
                        print(f"Issuing command: {q_command}")

                    if process_return_code == 0:
                        self.processed_video_data.append({
                            'q_name': video_data.pop('q_name'),
                            'q_link': video_data.pop('q_link')
                        })


                if video_data.get('a_name') and video_data.get('a_link'):
                    print(f"Downloading video...: {video_data['a_name']}")
                    name = video_data['a_name'].replace('.','').strip()
                    a_file_name = os.path.join(download_dir, f"{name}.mp4")
                    a_command = f"wget {video_data['a_link']} -O '{a_file_name}'"
                    if not dry_run:
                        process = await asyncio.subprocess.create_subprocess_shell(a_command)
                        process_return_code = process.returncode
                    else:
                        print(f"Issuing command: {a_command}")

                    if process_return_code == 0:
                        self.processed_video_data.append({
                            'a_name': video_data.pop('a_name'),
                            'a_link': video_data.pop('a_link')
                        })

                self.failed_video_data.append(video_data)
            await asyncio.sleep(2)



if __name__ == '__main__':
    PATH = dict(
        ROOT_PATH=[
            ('selector', '#ember60 > section > div > div > div > ol > li', -1,
             'click'),  # Video List
            ('selector', (
                '#ember156 > div.interview-prep-question-details__container.container'
                '-with-shadow.p0.mb4 > header > h1'), 1, 'text')  # Video title
        ],

        ROOT_PATH_QUESTIONS=[
            # Video Question
            ('xpath', (  # Watch Question video
                '/html/body/div[6]/div[4]/div[3]/div[1]/div/article/div[1]/div[2]/article/button'),
             1, 'click'),
            # Play Question video
            ('xpath', '/html/body/div[4]/div/div/div[2]/div/div[1]/button', 1,
             'click'),
            # Get Question video
            ('xpath', '/html/body/div/div/div[5]/div[1]/video', 1, 'src'),
            # close Question video
            # ('xpath', '/html/body/div[4]/div/div/button', 1, 'click')
            ('selector', 'button[data-test-modal-close-btn=""]', 1, 'click')
        ],

        ROOT_PATH_ANSWERS=[
            # Video Answer
            ('xpath', (  # Watch Answer video
                '/html/body/div[6]/div[4]/div[3]/div[1]/div/article/div[1]/div[3]/article/button'),
             1, 'click'),
            # Play Answer video
            ('xpath', '/html/body/div[4]/div/div/div[2]/div/div[1]/button', 1,
             'click'),
            # Get Answer video
            ('xpath', '/html/body/div/div/div[5]/div[1]/video', 1, 'src'),
            # close Answer video
            # ('xpath', '/html/body/div[4]/div/div/button', 1, 'click')
            ('selector', 'button[data-test-modal-close-btn=""]', 1, 'click')
        ],

        ROOT_PATH_USER_LOGIN=[
            # Click Sign in
            ('xpath', '/html/body/div/main/p/a', 1, 'click'),
            # Enter username
            ('selector', '#username', 1, 'username'),
            # Enter password
            ('selector', '#password', 1, 'password'),
            # Submit form
            ('selector', '#app__container > main > div > form > div.login__form_action_container > button', 1, 'click')
            # ('xpath', '/html/body/div/main/div/form/div[3]/button', 1, 'click')
        ],
    )
    COOKIES = None

    APP_MODE = True
    site_url = ('https://www.linkedin.com/interview-prep/assessments'
               '/urn:li:fs_assessment:(1,a)/question'
               '/urn:li:fs_assessmentQuestion:(10011,aq11)/')
    username = os.getenv('LINKEDIN_USER')
    password = os.getenv('LINKEDIN_PASS')
    crawler = Crawler(site_url, PATH, COOKIES, username=username,
                      password=password, test_mode=APP_MODE, timeout=5)
    loop = asyncio.get_event_loop()
    tasks = [
        crawler.launch(),
        crawler.download_videos(dry_run=True),
    ]
    loop.run_until_complete(asyncio.gather(*tasks))
    print(f"Unprocessed Videos:{len(crawler.all_video_data)} {crawler.all_video_data}\n"
          f"Processed Videos:{len(crawler.processed_video_data)} {crawler.processed_video_data}\n"
          f"Failed Videos:{len(crawler.failed_video_data)} {crawler.failed_video_data}\n")
    loop.close()

Reach out

© 2024 Chuma Umenze. Some rights reserved.