从钉钉后台对接考勤打卡信息(仅供参考)

Posted 小蜗牛爱远行

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从钉钉后台对接考勤打卡信息(仅供参考)相关的知识,希望对你有一定的参考价值。

# -*- coding: utf-8 -*-
import requests, logging, json, openpyxl, os, time
#官方下载SDK
import dingtalk.api
from datetime import datetime as dt
from datetime import timedelta
import xlsxwriter
import configparser
import datetime

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s  line:%(lineno)d  %(levelname)s : %(message)s',
                    datefmt=' %Y-%m-%d %H:%M:%S',
                    filename=os.path.join(os.getcwd(), 'dingInfos2.log'),
                    filemode='a')

data_time = str(dt.now())
last_time = str(datetime.date.today() + datetime.timedelta(-1)) + ' 00:00:00'
last_time1 = str(datetime.date.today() + datetime.timedelta(-1)) + ' 24:00:00'
time1 = dt.now().strftime('%Y-%m-%d') + ' 11:00:00'
time2 = dt.now().strftime('%Y-%m-%d') + ' 09:00:00'
config = configparser.ConfigParser()
config.read(os.path.join(os.getcwd(), "config.ini"), "UTF-8")


class Dingding():
    def __init__(self):
        self.appkey = config['***']['AppKey']
        self.appsecret = config['***']['AppSecret']
        self.access_token = self.getToken()
        self.userids = self.get_userid()

    def getToken(self):
        url = 'https://oapi.dingtalk.com/gettoken?appkey=' + self.appkey + '&appsecret=' + self.appsecret
        response = requests.get(url=url)
        result = response.json()
        try:
            access_token = result['access_token']
            logging.info('获取密钥成功')
        except Exception as e:
            errmsg = result['errmsg']
            logging.info(e, '\\n', errmsg)
            access_token = ''
        return access_token
        
	#先获取所有ID
    def get_userid(self):
        userids = []
        request = dingtalk.api.OapiSmartworkHrmEmployeeQueryonjobRequest("https://oapi.dingtalk.com/topapi/smartwork/hrm/employee/queryonjob")
        request.status_list = "2,3,5,-1"
        request.offset = 0
        request.size = 50
        result = request.getResponse(self.access_token)
        userids.extend(result["result"]["data_list"])
        while "next_cursor" in result["result"]:
            request.offset = request.offset + request.size
            result = request.getResponse(self.access_token)
            if result['errcode'] == 9006 or result['errcode'] == 9005:
                time.sleep(60)
                result = request.getResponse(self.access_token)
                userids.extend(result["result"]["data_list"])
            elif result['errcode'] == 90018:
                time.sleep(1)
                result = request.getResponse(self.access_token)
                userids.extend(result["result"]["data_list"])
            else:
                pass
            userids.extend(result['result']['data_list'])
        logging.info(userids)
        logging.info('获取部门userids成功')
        return userids
	
	#目前钉钉只支持从部门查取对应的工号
    def get_jobnum(self):
        uid_jnum = {}  # {"员工ID":"员工工号"}
        userids = self.userids
        request = dingtalk.api.OapiUserGetDeptMemberRequest("https://oapi.dingtalk.com/user/get")
        request2 = dingtalk.api.OapiSmartworkHrmEmployeeListRequest(
            "https://oapi.dingtalk.com/topapi/smartwork/hrm/employee/list")
        for uid in userids:
            try:
                request.userid = uid
                f = request.getResponse(self.access_token)
                if f['errcode'] == 9006 or f['errcode'] == 9005:
                    time.sleep(60)
                    f = request.getResponse(self.access_token)
                elif f['errcode'] == 90018:
                    time.sleep(1)
                    f = request.getResponse(self.access_token)
                else:
                    pass
                if 'jobnumber' in f:
                    uid_jnum[f['userid']] = f['jobnumber']
            except :
                print(uid)
                request2.userid_list = uid
                try:
                    resp = request2.getResponse(self.access_token)
                    print(resp)
                except Exception as e:
                    logging.info("花名册获取请求报错:", e)
                if "result" in resp:
                    if resp["result"][0]["field_list"]:
                        for item in resp["result"][0]["field_list"]:
                            try:
                                if "field_name" in item:
                                    if item["field_name"] == "工号":
                                        uid_jnum[uid] = item["value"]
                            except Exception as e:
                                logging.info("获取姓名、工号报错:", e)
        logging.info('员工ID与工号对照字典已生成')
        return uid_jnum


	#获取打卡信息
    def get_attendence_listrecord(self, userids):
        day = 0
        request = dingtalk.api.OapiAttendanceListRecordRequest('https://oapi.dingtalk.com/attendance/list')

        if data_time < time2:
            request.workDateFrom = last_time
            request.workDateTo = last_time1
        else:
            request.workDateFrom = data_time
            request.workDateTo = data_time

        request.workDateTo = str(dt.now())
        request.userIdList = userids
        request.offset = 0
        request.limit = 50
        f = request.getResponse(self.access_token)
        h = []
        h.append(f)
        while ('hasMore', True) in f.items():
            request.offset = request.offset + request.limit
            # request.limit += 49
            f = request.getResponse(self.access_token)
            h.append(f)
        return h

    def get_value(self):
        TY = []
        uid_jnum = self.get_jobnum()
        userid_in = self.userids
        users = []
        if len(userid_in) > 50:

            start = 0
            end = 49
            while end < len(userid_in):
                users.append(userid_in[start:end])
                start += 49
                end = start + 50
                users.append(userid_in[start:end])
        print(users)
        for userid_in_in in users:
            reponse_all = self.get_attendence_listrecord(userid_in_in)
            if reponse_all[0]['errmsg'] != 'ok':
                reponse_all = self.get_attendence_listrecord(userid_in_in)
            for respose in reponse_all:
                try:
                    for respose in respose['recordresult']:
                        if respose['sourceType'] != 'SYSTEM':
                            if uid_jnum[respose['userId']]:
                                str_time = str(dt.fromtimestamp(int(respose['userCheckTime']) / 1000)).split(' ')
                                value = (uid_jnum[respose['userId']], str_time[0], str_time[1])
                                TY.append(value)
                except Exception as e:
                    logging.info(e)
        logging.info('获取部门所有考勤结果成功')
        return list(set(TY))


kq_ding = Dingding()
daka_result = kq_ding.get_value()

shr_excel = os.path.join(os.getcwd(), 'dingdingData2.xlsx')
test_book = xlsxwriter.Workbook(shr_excel)
worksheet = test_book.add_worksheet(u'打卡记录导入')
row = 4
col = 0
worksheet.write(0, 0, u'实体名称')
worksheet.write(0, 1, u'原始打卡记录实体')
worksheet.write(1, 0, u'实体表')
worksheet.write(1, 1, 'T_HR_ATS_PunchCardRecord')
worksheet.write(2, 0, u'描述')
worksheet.write(2, 1, u'原始打卡记录实体')
worksheet.write(3, 0, u'考勤编号')
worksheet.write(3, 1, u'打卡日期')
worksheet.write(3, 2, u'打卡时间')
worksheet.write(3, 3, u'打卡位置')
for item in daka_result:
    if item[2] < '10:00:00':
        worksheet.write(row, col, item[0])
        worksheet.write(row, col+1, item[1])
        worksheet.write(row, col+2, item[2])
        row += 1
    else:
        if item[2] > '10:00:00':
            worksheet.write(row, col, item[0])
            worksheet.write(row, col+1, item[1])
            worksheet.write(row, col+2, item[2])
            row += 1
test_book.close()
logging.info('打卡记录已经写入完成')






以上是关于从钉钉后台对接考勤打卡信息(仅供参考)的主要内容,如果未能解决你的问题,请参考以下文章

从钉钉后台对接考勤打卡信息(仅供参考)

从钉钉后台获取考勤数据(其他数据获取类似)

从钉钉后台获取考勤数据(其他数据获取类似)

从钉钉后台获取考勤数据(其他数据获取类似)

如何实现钉钉考勤手机异地打卡

钉钉打卡可以同步到企业微信吗